SpencerCPurdy's picture
Update app.py
8619880 verified
raw
history blame
128 kB
# Multi-Agent AI Collaboration System
# Author: Spencer Purdy
# Description: Enterprise-grade multi-agent system with specialized AI agents collaborating
# to solve complex problems through intelligent task decomposition and parallel processing.
# Installation (uncomment for Google Colab)
# !pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic scipy
import os
import json
import time
import asyncio
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any, Union, Set
from dataclasses import dataclass, field
from enum import Enum
import warnings
warnings.filterwarnings('ignore')
# Core libraries
import gradio as gr
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
# LangChain and AI libraries
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferMemory
from pydantic import BaseModel, Field
# Async libraries
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Config:
"""Configuration settings for the multi-agent system."""
# Model settings
DEFAULT_MODEL = "gpt-4"
TEMPERATURE = 0.7
MAX_TOKENS = 1500
# Agent settings
MAX_ITERATIONS = 10
COLLABORATION_TIMEOUT = 300 # seconds
# Visualization settings
GRAPH_UPDATE_INTERVAL = 0.5 # seconds
NODE_COLORS = {
'Researcher': '#3498db',
'Analyst': '#e74c3c',
'Critic': '#f39c12',
'Synthesizer': '#2ecc71',
'Coordinator': '#9b59b6'
}
# Performance settings
ENABLE_PERFORMANCE_TRACKING = True
BENCHMARK_BASELINE = {
"single_agent_time": 45.0,
"single_agent_quality": 0.72
}
# Report settings
CONFIDENCE_THRESHOLD = 0.7
MAX_REPORT_SECTIONS = 10
COMPANY_NAME = "Multi-Agent AI Platform"
# Demo settings
DEMO_MODE_ENABLED = True
DEMO_PROBLEMS = [
"Analyze the impact of remote work on team productivity and collaboration",
"Develop a strategy for sustainable urban transportation",
"Evaluate the risks and benefits of AI in healthcare",
"Design a framework for ethical AI development",
"Create a plan for digital transformation in education"
]
class AgentRole(Enum):
"""Enumeration of agent roles in the system."""
RESEARCHER = "Researcher"
ANALYST = "Analyst"
CRITIC = "Critic"
SYNTHESIZER = "Synthesizer"
COORDINATOR = "Coordinator"
class TaskStatus(Enum):
"""Task execution status."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class MessageType(Enum):
"""Types of messages between agents."""
TASK_ASSIGNMENT = "task_assignment"
COLLABORATION_REQUEST = "collaboration_request"
INFORMATION_SHARING = "information_sharing"
FEEDBACK = "feedback"
COMPLETION_REPORT = "completion_report"
@dataclass
class Task:
"""Represents a task to be executed by agents."""
id: str
description: str
assigned_to: Optional[str] = None
status: TaskStatus = TaskStatus.PENDING
dependencies: List[str] = field(default_factory=list)
result: Optional[Any] = None
confidence: float = 0.0
created_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
metadata: Dict[str, Any] = field(default_factory=dict)
performance_metrics: Dict[str, float] = field(default_factory=dict)
@dataclass
class AgentMessage:
"""Message passed between agents."""
sender: str
recipient: str
content: str
message_type: MessageType
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
priority: int = 1 # 1 (low) to 5 (high)
class PerformanceTracker:
"""Tracks performance metrics for the multi-agent system."""
def __init__(self):
self.metrics = {
'task_completion_times': [],
'agent_utilization': {},
'collaboration_count': 0,
'total_messages': 0,
'quality_scores': [],
'system_start_time': None,
'system_end_time': None
}
def start_tracking(self):
"""Start performance tracking."""
self.metrics['system_start_time'] = datetime.now()
def end_tracking(self):
"""End performance tracking."""
self.metrics['system_end_time'] = datetime.now()
def record_task_completion(self, task: Task):
"""Record task completion metrics."""
if task.created_at and task.completed_at:
completion_time = (task.completed_at - task.created_at).total_seconds()
self.metrics['task_completion_times'].append(completion_time)
def record_agent_activity(self, agent_name: str, activity_duration: float):
"""Record agent activity duration."""
if agent_name not in self.metrics['agent_utilization']:
self.metrics['agent_utilization'][agent_name] = 0
self.metrics['agent_utilization'][agent_name] += activity_duration
def record_collaboration(self):
"""Record a collaboration event."""
self.metrics['collaboration_count'] += 1
def record_message(self):
"""Record a message exchange."""
self.metrics['total_messages'] += 1
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary statistics."""
total_time = 0
if self.metrics['system_start_time'] and self.metrics['system_end_time']:
total_time = (self.metrics['system_end_time'] - self.metrics['system_start_time']).total_seconds()
avg_task_time = np.mean(self.metrics['task_completion_times']) if self.metrics['task_completion_times'] else 0
baseline_time = Config.BENCHMARK_BASELINE['single_agent_time']
time_improvement = ((baseline_time - avg_task_time) / baseline_time * 100) if avg_task_time > 0 else 0
return {
'total_execution_time': total_time,
'average_task_completion_time': avg_task_time,
'total_collaborations': self.metrics['collaboration_count'],
'total_messages': self.metrics['total_messages'],
'agent_utilization': self.metrics['agent_utilization'],
'time_improvement_percentage': time_improvement,
'efficiency_score': self._calculate_efficiency_score()
}
def _calculate_efficiency_score(self) -> float:
"""Calculate overall efficiency score."""
factors = []
if self.metrics['task_completion_times']:
avg_time = np.mean(self.metrics['task_completion_times'])
speed_factor = min(1.0, Config.BENCHMARK_BASELINE['single_agent_time'] / avg_time if avg_time > 0 else 1)
factors.append(speed_factor)
if self.metrics['total_messages'] > 0:
collab_factor = min(1.0, self.metrics['collaboration_count'] / self.metrics['total_messages'])
factors.append(collab_factor)
if self.metrics['agent_utilization']:
utilization_values = list(self.metrics['agent_utilization'].values())
if utilization_values:
avg_utilization = np.mean(utilization_values)
max_utilization = max(utilization_values)
balance_factor = avg_utilization / max_utilization if max_utilization > 0 else 0
factors.append(balance_factor)
return np.mean(factors) if factors else 0.5
class AgentMemory:
"""Manages agent conversation history and context."""
def __init__(self, max_messages: int = 50):
self.messages: List[AgentMessage] = []
self.max_messages = max_messages
self.context: Dict[str, Any] = {}
self.knowledge_base: Dict[str, Any] = {}
def add_message(self, message: AgentMessage):
"""Add a message to memory."""
self.messages.append(message)
if len(self.messages) > self.max_messages:
self.messages.pop(0)
self._extract_knowledge(message)
def get_recent_messages(self, n: int = 10) -> List[AgentMessage]:
"""Get n most recent messages."""
return self.messages[-n:]
def get_messages_by_sender(self, sender: str) -> List[AgentMessage]:
"""Get all messages from a specific sender."""
return [msg for msg in self.messages if msg.sender == sender]
def get_high_priority_messages(self) -> List[AgentMessage]:
"""Get high priority messages."""
return [msg for msg in self.messages if msg.priority >= 4]
def update_context(self, key: str, value: Any):
"""Update context information."""
self.context[key] = value
def get_context(self, key: str) -> Any:
"""Get context information."""
return self.context.get(key)
def _extract_knowledge(self, message: AgentMessage):
"""Extract and store important knowledge from messages."""
keywords = ['finding', 'conclusion', 'recommendation', 'insight', 'pattern']
content_lower = message.content.lower()
for keyword in keywords:
if keyword in content_lower:
knowledge_key = f"{message.sender}_{keyword}_{len(self.knowledge_base)}"
self.knowledge_base[knowledge_key] = {
'content': message.content,
'sender': message.sender,
'timestamp': message.timestamp,
'type': keyword
}
class BaseAgent:
"""Base class for all AI agents in the system."""
def __init__(self, name: str, role: AgentRole, llm: Optional[ChatOpenAI] = None):
self.name = name
self.role = role
self.llm = llm
self.memory = AgentMemory()
self.active = True
self.current_task: Optional[Task] = None
self.completed_tasks: List[Task] = []
self.performance_tracker = PerformanceTracker()
self.collaboration_partners: Set[str] = set()
async def process_task(self, task: Task) -> Task:
"""Process a task and return the result."""
self.current_task = task
task.status = TaskStatus.IN_PROGRESS
task.assigned_to = self.name
start_time = datetime.now()
try:
if self.llm:
result = await self._execute_task(task)
else:
result = await self._simulate_task_execution(task)
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.confidence = self._calculate_confidence(result)
execution_time = (task.completed_at - start_time).total_seconds()
task.performance_metrics['execution_time'] = execution_time
task.performance_metrics['confidence'] = task.confidence
self.completed_tasks.append(task)
self.performance_tracker.record_task_completion(task)
except Exception as e:
logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}")
task.status = TaskStatus.FAILED
task.result = f"Error: {str(e)}"
task.confidence = 0.0
finally:
self.current_task = None
return task
async def _execute_task(self, task: Task) -> Any:
"""Execute the task - to be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement _execute_task")
async def _simulate_task_execution(self, task: Task) -> Any:
"""Simulate task execution for demo mode."""
await asyncio.sleep(np.random.uniform(1, 3))
simulation_templates = {
AgentRole.RESEARCHER: {
"findings": f"Research on '{task.description}' reveals multiple perspectives and data points.",
"sources": ["Academic studies", "Industry reports", "Expert interviews"],
"key_points": [
"Significant trends identified in the domain",
"Multiple stakeholder perspectives considered",
"Historical context provides important insights"
]
},
AgentRole.ANALYST: {
"analysis": f"Analysis of '{task.description}' shows clear patterns and relationships.",
"patterns": [{"description": "Trend identified", "type": "trend"}],
"insights": ["Data suggests strong correlations", "Multiple factors influence outcomes"],
"confidence_metrics": {"overall_confidence": 0.85}
},
AgentRole.CRITIC: {
"evaluation": f"Critical evaluation of '{task.description}' identifies strengths and areas for improvement.",
"strengths": ["Comprehensive approach", "Well-structured analysis"],
"gaps": ["Additional data needed", "Alternative perspectives to consider"],
"quality_score": {"overall": 0.78}
},
AgentRole.SYNTHESIZER: {
"synthesis": f"Synthesis of findings on '{task.description}' provides integrated insights.",
"key_themes": [{"theme": "Theme 1", "description": "Central finding"}],
"final_recommendations": ["Strategic recommendation 1", "Strategic recommendation 2"],
"executive_summary": "Comprehensive analysis reveals actionable insights."
}
}
return simulation_templates.get(self.role, {"result": "Task completed"})
def _calculate_confidence(self, result: Any) -> float:
"""Calculate confidence score for the result."""
base_confidence = 0.5
if result and isinstance(result, dict):
if 'confidence_metrics' in result:
return result['confidence_metrics'].get('overall_confidence', base_confidence)
expected_keys = {'findings', 'analysis', 'evaluation', 'synthesis'}
actual_keys = set(result.keys())
if not expected_keys: return base_confidence
completeness = len(actual_keys.intersection(expected_keys)) / len(expected_keys)
content_length = sum(len(str(v)) for v in result.values() if isinstance(v, (str, list)))
length_factor = min(1.0, content_length / 500)
confidence = base_confidence + (completeness * 0.3) + (length_factor * 0.2)
return min(0.95, confidence)
return base_confidence
async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage) -> AgentMessage:
"""Handle collaboration with another agent."""
self.memory.add_message(message)
self.collaboration_partners.add(other_agent.name)
self.performance_tracker.record_collaboration()
response_content = await self._process_collaboration(message)
response_message = AgentMessage(
sender=self.name,
recipient=other_agent.name,
content=response_content,
message_type=MessageType.INFORMATION_SHARING,
priority=message.priority
)
other_agent.memory.add_message(response_message)
self.performance_tracker.record_message()
return response_message
async def _process_collaboration(self, message: AgentMessage) -> str:
"""Process collaboration message - to be implemented by subclasses."""
return f"{self.name} acknowledges collaboration request from {message.sender}"
def get_status_summary(self) -> Dict[str, Any]:
"""Get current status summary of the agent."""
return {
'name': self.name,
'role': self.role.value,
'active': self.active,
'current_task': self.current_task.description if self.current_task else "None",
'completed_tasks': len(self.completed_tasks),
'average_confidence': np.mean([t.confidence for t in self.completed_tasks]) if self.completed_tasks else 0,
'collaboration_count': len(self.collaboration_partners),
'memory_size': len(self.memory.messages)
}
class ResearcherAgent(BaseAgent):
"""Agent specialized in researching and gathering information."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.RESEARCHER, llm)
self.research_sources: List[str] = []
self.research_methods = ["literature_review", "data_collection", "expert_consultation", "field_research"]
async def _execute_task(self, task: Task) -> Any:
"""Execute research task."""
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a Research Agent specializing in gathering comprehensive information.
Your role is to:
1. Break down complex topics into research questions
2. Identify key information sources and data points
3. Provide detailed, factual information with citations where possible
4. Flag areas requiring further investigation
5. Maintain objectivity and consider multiple perspectives"""),
HumanMessage(content=f"Research the following: {task.description}")
])
response = await self.llm.ainvoke(prompt.format_messages())
research_result = {
"findings": response.content,
"sources": self._extract_sources(response.content),
"key_points": self._extract_key_points(response.content),
"areas_for_investigation": self._identify_gaps(response.content),
"research_quality_score": self._assess_research_quality(response.content)
}
self.memory.update_context('latest_research', research_result)
return research_result
def _extract_sources(self, content: str) -> List[str]:
"""Extract potential sources from research content."""
sources = []
source_indicators = ['source:', 'reference:', 'based on:', 'according to', 'study:', 'report:']
lines = content.split('\n')
for line in lines:
line_lower = line.lower()
for indicator in source_indicators:
if indicator in line_lower:
sources.append(line.strip())
break
return sources[:10]
def _extract_key_points(self, content: str) -> List[str]:
"""Extract key points from research."""
key_points = []
lines = content.split('\n')
for line in lines:
line = line.strip()
if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')):
key_points.append(line)
elif any(phrase in line.lower() for phrase in ['key finding:', 'important:', 'notably:']):
key_points.append(line)
return key_points[:15]
def _identify_gaps(self, content: str) -> List[str]:
"""Identify areas needing more research."""
gaps = []
gap_indicators = ['unclear', 'requires further', 'need more', 'investigate',
'unknown', 'limited data', 'insufficient evidence']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in gap_indicators):
gaps.append(sentence.strip() + '.')
return gaps[:5]
def _assess_research_quality(self, content: str) -> float:
"""Assess the quality of research output."""
quality_score = 0.5
if self._extract_sources(content):
quality_score += 0.15
if self._extract_key_points(content):
quality_score += 0.15
word_count = len(content.split())
if word_count > 300:
quality_score += 0.1
analytical_terms = ['analysis', 'evaluation', 'comparison', 'contrast', 'implication']
if any(term in content.lower() for term in analytical_terms):
quality_score += 0.1
return min(1.0, quality_score)
class AnalystAgent(BaseAgent):
"""Agent specialized in analyzing data and identifying patterns."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.ANALYST, llm)
self.analysis_methods = ["statistical", "comparative", "trend", "causal", "predictive"]
self.analysis_frameworks = ["SWOT", "PESTLE", "Porter's Five Forces", "Cost-Benefit"]
async def _execute_task(self, task: Task) -> Any:
"""Execute analysis task."""
context = self._get_relevant_context(task)
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are an Analyst Agent specializing in data analysis and pattern recognition.
Your role is to:
1. Analyze information systematically and objectively
2. Identify patterns, trends, and correlations
3. Provide quantitative insights where possible
4. Draw logical conclusions based on evidence
5. Apply appropriate analytical frameworks
6. Consider multiple analytical perspectives"""),
HumanMessage(content=f"Analyze the following: {task.description}\n\nContext: {context}")
])
response = await self.llm.ainvoke(prompt.format_messages())
analysis_result = {
"analysis": response.content,
"patterns": self._identify_patterns(response.content),
"insights": self._extract_insights(response.content),
"recommendations": self._generate_recommendations(response.content),
"confidence_metrics": self._calculate_analysis_confidence(response.content),
"analytical_framework": self._identify_framework_used(response.content)
}
self.memory.update_context('latest_analysis', analysis_result)
return analysis_result
def _get_relevant_context(self, task: Task) -> str:
"""Get relevant context from memory for the task."""
context_items = []
recent_messages = self.memory.get_recent_messages(5)
for msg in recent_messages:
if task.description.lower() in msg.content.lower():
context_items.append(f"Previous finding: {msg.content[:200]}...")
for key, knowledge in self.memory.knowledge_base.items():
if 'finding' in knowledge['type'] or 'insight' in knowledge['type']:
context_items.append(f"Known insight: {knowledge['content'][:200]}...")
return "\n".join(context_items[:3])
def _identify_patterns(self, content: str) -> List[Dict[str, str]]:
"""Identify patterns in the analysis."""
patterns = []
pattern_types = {
'trend': ['trend', 'increasing', 'decreasing', 'growth', 'decline'],
'correlation': ['correlation', 'relationship', 'associated', 'linked'],
'cyclical': ['cycle', 'periodic', 'seasonal', 'recurring'],
'anomaly': ['anomaly', 'outlier', 'unusual', 'exceptional']
}
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
for pattern_type, keywords in pattern_types.items():
if any(keyword in sentence_lower for keyword in keywords):
patterns.append({
"description": sentence.strip() + '.',
"type": pattern_type,
"confidence": 0.8
})
break
return patterns[:8]
def _extract_insights(self, content: str) -> List[str]:
"""Extract key insights from analysis."""
insights = []
insight_indicators = ['shows', 'indicates', 'suggests', 'reveals',
'demonstrates', 'implies', 'means that', 'therefore']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in insight_indicators):
insights.append(sentence.strip() + '.')
return insights[:10]
def _generate_recommendations(self, content: str) -> List[Dict[str, str]]:
"""Generate recommendations based on analysis."""
recommendations = []
rec_indicators = ['recommend', 'suggest', 'should', 'consider',
'advise', 'propose', 'it would be beneficial']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in rec_indicators):
recommendations.append(sentence.strip() + '.')
prioritized = []
for rec in recommendations:
priority = "high" if any(word in rec.lower() for word in ['critical', 'essential', 'must']) else "medium"
prioritized.append({"recommendation": rec, "priority": priority})
return prioritized[:7]
def _calculate_analysis_confidence(self, content: str) -> Dict[str, float]:
"""Calculate confidence metrics for the analysis."""
evidence_count = sum(content.lower().count(word) for word in ['evidence', 'data', 'shows', 'proves'])
uncertainty_count = sum(content.lower().count(word) for word in ['may', 'might', 'possibly', 'perhaps'])
evidence_strength = min(1.0, evidence_count / 10)
certainty_level = max(0.0, 1.0 - (uncertainty_count / 10))
quantitative_indicators = ['percentage', '%', 'ratio', 'correlation', 'statistical']
quantitative_score = 0.7 if any(ind in content.lower() for ind in quantitative_indicators) else 0.5
overall_confidence = (evidence_strength + certainty_level + quantitative_score) / 3
return {
"overall_confidence": overall_confidence,
"evidence_strength": evidence_strength,
"certainty_level": certainty_level,
"quantitative_score": quantitative_score
}
def _identify_framework_used(self, content: str) -> Optional[str]:
"""Identify which analytical framework was used."""
content_lower = content.lower()
for framework in self.analysis_frameworks:
if framework.lower() in content_lower:
return framework
if all(word in content_lower for word in ['strength', 'weakness', 'opportunity', 'threat']):
return "SWOT"
elif any(word in content_lower for word in ['political', 'economic', 'social', 'technological']):
return "PESTLE"
return None
class CriticAgent(BaseAgent):
"""Agent specialized in critical evaluation and quality assurance."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.CRITIC, llm)
self.evaluation_criteria = [
"accuracy", "completeness", "logic", "evidence",
"clarity", "relevance", "consistency", "objectivity"
]
self.evaluation_rubric = self._create_evaluation_rubric()
def _create_evaluation_rubric(self) -> Dict[str, Dict[str, float]]:
"""Create evaluation rubric with weighted criteria."""
return {
"accuracy": {"weight": 0.20, "score": 0.0},
"completeness": {"weight": 0.15, "score": 0.0},
"logic": {"weight": 0.15, "score": 0.0},
"evidence": {"weight": 0.15, "score": 0.0},
"clarity": {"weight": 0.10, "score": 0.0},
"relevance": {"weight": 0.10, "score": 0.0},
"consistency": {"weight": 0.10, "score": 0.0},
"objectivity": {"weight": 0.05, "score": 0.0}
}
async def _execute_task(self, task: Task) -> Any:
"""Execute critical evaluation task."""
evaluation_context = self._gather_evaluation_context(task)
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a Critic Agent specializing in rigorous evaluation and quality assurance.
Your role is to:
1. Critically evaluate arguments and conclusions
2. Identify weaknesses, gaps, and potential biases
3. Verify logical consistency and evidence quality
4. Suggest improvements and alternative perspectives
5. Ensure high standards of analysis
6. Apply systematic evaluation criteria
7. Provide constructive feedback"""),
HumanMessage(content=f"Critically evaluate the following: {task.description}\n\nContent to evaluate: {evaluation_context}")
])
response = await self.llm.ainvoke(prompt.format_messages())
critique_result = {
"evaluation": response.content,
"strengths": self._identify_strengths(response.content),
"weaknesses": self._identify_weaknesses(response.content),
"gaps": self._identify_gaps(response.content),
"improvements": self._suggest_improvements(response.content),
"quality_score": self._calculate_quality_score(response.content),
"alternative_perspectives": self._identify_alternatives(response.content),
"final_verdict": self._generate_verdict(response.content)
}
self.memory.update_context('evaluation_history', critique_result)
return critique_result
def _gather_evaluation_context(self, task: Task) -> str:
"""Gather relevant context for evaluation."""
context_items = []
recent_messages = self.memory.get_recent_messages(10)
for msg in recent_messages:
if msg.message_type in [MessageType.COMPLETION_REPORT, MessageType.INFORMATION_SHARING]:
context_items.append(f"{msg.sender}: {msg.content[:300]}...")
for key, knowledge in self.memory.knowledge_base.items():
if knowledge['type'] in ['finding', 'conclusion', 'insight']:
context_items.append(f"Previous {knowledge['type']}: {knowledge['content'][:200]}...")
return "\n\n".join(context_items[:5])
def _identify_strengths(self, content: str) -> List[Dict[str, str]]:
"""Identify strengths in the evaluated content."""
strengths = []
strength_indicators = ['strong', 'excellent', 'well', 'good', 'effective',
'solid', 'robust', 'comprehensive', 'thorough']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
for indicator in strength_indicators:
if indicator in sentence_lower:
strengths.append({
"strength": sentence.strip() + '.',
"category": self._categorize_strength(sentence),
"impact": "high" if any(word in sentence_lower for word in ['very', 'extremely', 'highly']) else "medium"
})
break
return strengths[:6]
def _categorize_strength(self, sentence: str) -> str:
"""Categorize the type of strength identified."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['method', 'approach', 'framework']):
return "methodology"
elif any(word in sentence_lower for word in ['data', 'evidence', 'support']):
return "evidence"
elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']):
return "reasoning"
elif any(word in sentence_lower for word in ['clear', 'organized', 'structured']):
return "presentation"
else:
return "general"
def _identify_weaknesses(self, content: str) -> List[Dict[str, str]]:
"""Identify weaknesses in the evaluated content."""
weaknesses = []
weakness_indicators = ['weak', 'lack', 'insufficient', 'poor', 'inadequate',
'missing', 'limited', 'unclear', 'vague']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
for indicator in weakness_indicators:
if indicator in sentence_lower:
weaknesses.append({
"weakness": sentence.strip() + '.',
"severity": self._assess_severity(sentence),
"category": self._categorize_weakness(sentence)
})
break
return weaknesses[:6]
def _assess_severity(self, sentence: str) -> str:
"""Assess the severity of a weakness."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['critical', 'severe', 'major', 'significant']):
return "high"
elif any(word in sentence_lower for word in ['moderate', 'some', 'partial']):
return "medium"
else:
return "low"
def _categorize_weakness(self, sentence: str) -> str:
"""Categorize the type of weakness identified."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['data', 'evidence', 'support']):
return "evidence"
elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']):
return "reasoning"
elif any(word in sentence_lower for word in ['bias', 'objective', 'neutral']):
return "objectivity"
elif any(word in sentence_lower for word in ['complete', 'comprehensive', 'thorough']):
return "completeness"
else:
return "general"
def _identify_gaps(self, content: str) -> List[str]:
"""Identify gaps in the analysis."""
gaps = []
gap_indicators = ['gap', 'missing', 'overlook', 'fail to', 'does not address',
'ignores', 'omits', 'neglects']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in gap_indicators):
gaps.append(sentence.strip() + '.')
return gaps[:5]
def _suggest_improvements(self, content: str) -> List[Dict[str, str]]:
"""Suggest improvements based on critique."""
improvements = []
improvement_indicators = ['could', 'should', 'improve', 'enhance',
'strengthen', 'add', 'consider', 'recommend']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in improvement_indicators):
improvements.append({
"suggestion": sentence.strip() + '.',
"priority": self._prioritize_improvement(sentence),
"effort": self._estimate_effort(sentence)
})
return improvements[:7]
def _prioritize_improvement(self, sentence: str) -> str:
"""Prioritize improvement suggestions."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['critical', 'essential', 'must', 'urgent']):
return "high"
elif any(word in sentence_lower for word in ['should', 'important', 'recommend']):
return "medium"
else:
return "low"
def _estimate_effort(self, sentence: str) -> str:
"""Estimate effort required for improvement."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['simple', 'easy', 'quick', 'minor']):
return "low"
elif any(word in sentence_lower for word in ['moderate', 'some', 'reasonable']):
return "medium"
elif any(word in sentence_lower for word in ['significant', 'substantial', 'major']):
return "high"
else:
return "medium"
def _calculate_quality_score(self, content: str) -> Dict[str, Any]:
"""Calculate detailed quality scores."""
scores = self.evaluation_rubric.copy()
content_lower = content.lower()
for criterion in self.evaluation_criteria:
score = 0.5
if criterion in content_lower and any(word in content_lower for word in ['good', 'strong', 'excellent']):
score += 0.3
if criterion in content_lower and any(word in content_lower for word in ['poor', 'weak', 'lacking']):
score -= 0.3
scores[criterion]["score"] = max(0.0, min(1.0, score))
overall = sum(scores[c]["score"] * scores[c]["weight"] for c in scores)
return {
"overall": overall,
"breakdown": {c: scores[c]["score"] for c in scores},
"grade": self._convert_to_grade(overall)
}
def _convert_to_grade(self, score: float) -> str:
"""Convert numeric score to letter grade."""
if score >= 0.9:
return "A"
elif score >= 0.8:
return "B"
elif score >= 0.7:
return "C"
elif score >= 0.6:
return "D"
else:
return "F"
def _identify_alternatives(self, content: str) -> List[str]:
"""Identify alternative perspectives mentioned."""
alternatives = []
alternative_indicators = ['alternatively', 'another perspective', 'different approach',
'could also', 'different view', 'alternative']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in alternative_indicators):
alternatives.append(sentence.strip() + '.')
return alternatives[:4]
def _generate_verdict(self, content: str) -> Dict[str, str]:
"""Generate final verdict based on evaluation."""
positive_count = sum(content.lower().count(word) for word in ['good', 'strong', 'excellent', 'effective'])
negative_count = sum(content.lower().count(word) for word in ['poor', 'weak', 'lacking', 'insufficient'])
if positive_count > negative_count * 2:
verdict = "Approved with minor revisions"
confidence = "high"
elif positive_count > negative_count:
verdict = "Approved with moderate revisions"
confidence = "medium"
else:
verdict = "Requires significant improvements"
confidence = "medium"
return {
"verdict": verdict,
"confidence": confidence,
"summary": "Based on comprehensive evaluation across multiple criteria."
}
class SynthesizerAgent(BaseAgent):
"""Agent specialized in synthesizing information and creating coherent narratives."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.SYNTHESIZER, llm)
self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate", "harmonize"]
self.output_formats = ["executive_summary", "detailed_report", "action_plan", "strategic_recommendation"]
async def _execute_task(self, task: Task) -> Any:
"""Execute synthesis task."""
synthesis_input = self._gather_synthesis_input(task)
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a Synthesizer Agent specializing in integrating diverse information.
Your role is to:
1. Combine multiple perspectives into coherent narratives
2. Resolve contradictions and find common ground
3. Create comprehensive summaries that capture key insights
4. Generate actionable conclusions and recommendations
5. Ensure clarity and accessibility of complex information
6. Prioritize information based on relevance and impact
7. Create structured outputs suitable for decision-making"""),
HumanMessage(content=f"Synthesize the following information: {task.description}\n\nInput data: {synthesis_input}")
])
response = await self.llm.ainvoke(prompt.format_messages())
synthesis_result = {
"synthesis": response.content,
"key_themes": self._extract_themes(response.content),
"consensus_points": self._identify_consensus(response.content),
"contradictions": self._identify_contradictions(response.content),
"final_recommendations": self._generate_final_recommendations(response.content),
"executive_summary": self._create_executive_summary(response.content),
"action_items": self._extract_action_items(response.content),
"confidence_level": self._assess_synthesis_confidence(response.content)
}
self.memory.update_context('latest_synthesis', synthesis_result)
return synthesis_result
def _gather_synthesis_input(self, task: Task) -> str:
"""Gather all relevant information for synthesis."""
input_sections = []
agent_findings = {}
for msg in self.memory.get_recent_messages(20):
if msg.sender not in agent_findings:
agent_findings[msg.sender] = []
agent_findings[msg.sender].append(msg.content[:500])
for agent, findings in agent_findings.items():
if findings:
input_sections.append(f"\n{agent} Contributions:\n" + "\n".join(findings[:3]))
knowledge_items = []
for key, knowledge in self.memory.knowledge_base.items():
knowledge_items.append(f"{knowledge['type'].title()}: {knowledge['content'][:200]}...")
if knowledge_items:
input_sections.append("\nKnowledge Base:\n" + "\n".join(knowledge_items[:5]))
return "\n".join(input_sections)
def _extract_themes(self, content: str) -> List[Dict[str, Any]]:
"""Extract major themes from synthesis."""
themes = []
theme_indicators = ['theme', 'pattern', 'trend', 'common', 'recurring',
'central', 'key finding', 'main point']
paragraphs = content.split('\n\n')
theme_count = 0
for paragraph in paragraphs:
paragraph_lower = paragraph.lower()
if any(indicator in paragraph_lower for indicator in theme_indicators):
theme_count += 1
themes.append({
"theme": f"Theme {theme_count}",
"description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(),
"importance": self._assess_theme_importance(paragraph),
"support_level": self._assess_support_level(paragraph)
})
if not themes and paragraphs:
for i, paragraph in enumerate(paragraphs[:5]):
if len(paragraph.strip()) > 50:
themes.append({
"theme": f"Finding {i+1}",
"description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(),
"importance": "medium",
"support_level": "moderate"
})
return themes[:6]
def _assess_theme_importance(self, content: str) -> str:
"""Assess the importance of a theme."""
content_lower = content.lower()
high_importance_indicators = ['critical', 'essential', 'fundamental', 'crucial', 'vital']
if any(indicator in content_lower for indicator in high_importance_indicators):
return "high"
low_importance_indicators = ['minor', 'secondary', 'marginal', 'peripheral']
if any(indicator in content_lower for indicator in low_importance_indicators):
return "low"
return "medium"
def _assess_support_level(self, content: str) -> str:
"""Assess the level of support for a theme."""
content_lower = content.lower()
strong_support = ['consensus', 'unanimous', 'clear evidence', 'strongly supported']
if any(indicator in content_lower for indicator in strong_support):
return "strong"
weak_support = ['limited evidence', 'some indication', 'preliminary', 'tentative']
if any(indicator in content_lower for indicator in weak_support):
return "weak"
return "moderate"
def _identify_consensus(self, content: str) -> List[Dict[str, str]]:
"""Identify points of consensus."""
consensus_points = []
consensus_indicators = ['agree', 'consensus', 'common', 'shared', 'unanimous',
'consistent', 'alignment', 'convergence']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in consensus_indicators):
consensus_points.append({
"point": sentence.strip() + '.',
"strength": "strong" if "unanimous" in sentence_lower or "clear consensus" in sentence_lower else "moderate"
})
return consensus_points[:6]
def _identify_contradictions(self, content: str) -> List[Dict[str, str]]:
"""Identify contradictions or conflicts."""
contradictions = []
conflict_indicators = ['however', 'contrary', 'conflict', 'disagree', 'opposing',
'contradicts', 'tension', 'divergent', 'inconsistent']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in conflict_indicators):
contradictions.append({
"contradiction": sentence.strip() + '.',
"resolution_suggested": self._check_for_resolution(sentence),
"impact": self._assess_contradiction_impact(sentence)
})
return contradictions[:4]
def _check_for_resolution(self, sentence: str) -> bool:
"""Check if a resolution is suggested for the contradiction."""
resolution_indicators = ['can be resolved', 'reconcile', 'bridge', 'common ground', 'compromise']
return any(indicator in sentence.lower() for indicator in resolution_indicators)
def _assess_contradiction_impact(self, sentence: str) -> str:
"""Assess the impact of a contradiction."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['fundamental', 'major', 'significant']):
return "high"
elif any(word in sentence_lower for word in ['minor', 'small', 'slight']):
return "low"
else:
return "medium"
def _generate_final_recommendations(self, content: str) -> List[Dict[str, Any]]:
"""Generate final synthesized recommendations."""
recommendations = []
rec_indicators = ['recommend', 'suggest', 'propose', 'advise', 'should', 'must']
sentences = content.split('.')
for sentence in sentences:
sentence_lower = sentence.lower()
if any(indicator in sentence_lower for indicator in rec_indicators):
recommendations.append({
"recommendation": sentence.strip() + '.',
"priority": self._determine_priority(sentence),
"timeframe": self._determine_timeframe(sentence),
"category": self._categorize_recommendation(sentence)
})
priority_order = {"high": 0, "medium": 1, "low": 2}
recommendations.sort(key=lambda x: priority_order.get(x["priority"], 3))
return recommendations[:8]
def _determine_priority(self, sentence: str) -> str:
"""Determine recommendation priority."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['urgent', 'immediate', 'critical', 'must']):
return "high"
elif any(word in sentence_lower for word in ['should', 'important', 'recommend']):
return "medium"
else:
return "low"
def _determine_timeframe(self, sentence: str) -> str:
"""Determine recommendation timeframe."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['immediate', 'now', 'urgent', 'asap']):
return "immediate"
elif any(word in sentence_lower for word in ['short-term', 'soon', 'near']):
return "short-term"
elif any(word in sentence_lower for word in ['long-term', 'future', 'eventually']):
return "long-term"
else:
return "medium-term"
def _categorize_recommendation(self, sentence: str) -> str:
"""Categorize the type of recommendation."""
sentence_lower = sentence.lower()
if any(word in sentence_lower for word in ['strategy', 'strategic', 'plan']):
return "strategic"
elif any(word in sentence_lower for word in ['operational', 'process', 'procedure']):
return "operational"
elif any(word in sentence_lower for word in ['tactical', 'action', 'implement']):
return "tactical"
else:
return "general"
def _create_executive_summary(self, content: str) -> str:
"""Create an executive summary of the synthesis."""
summary_parts = []
paragraphs = content.split('\n\n')
if paragraphs:
opening = paragraphs[0][:200]
if len(paragraphs[0]) > 200:
opening += "..."
summary_parts.append(opening)
key_finding_indicators = ['key finding', 'main conclusion', 'importantly', 'notably']
for paragraph in paragraphs[1:]:
if any(indicator in paragraph.lower() for indicator in key_finding_indicators):
summary_parts.append(paragraph[:150] + "..." if len(paragraph) > 150 else paragraph)
if len(summary_parts) >= 3:
break
if len(paragraphs) > 1:
conclusion = paragraphs[-1][:150]
if conclusion not in summary_parts:
summary_parts.append(conclusion + "..." if len(paragraphs[-1]) > 150 else conclusion)
return " ".join(summary_parts)
def _extract_action_items(self, content: str) -> List[Dict[str, str]]:
"""Extract specific action items from synthesis."""
action_items = []
action_indicators = ['action:', 'task:', 'todo:', 'action item', 'next step', 'to do']
lines = content.split('\n')
for line in lines:
line_lower = line.lower()
if any(indicator in line_lower for indicator in action_indicators):
action_items.append({
"action": line.strip(),
"owner": "TBD",
"deadline": "TBD",
"status": "pending"
})
elif line.strip() and line.strip()[0].isdigit() and 'action' in line_lower:
action_items.append({
"action": line.strip(),
"owner": "TBD",
"deadline": "TBD",
"status": "pending"
})
return action_items[:10]
def _assess_synthesis_confidence(self, content: str) -> Dict[str, Any]:
"""Assess confidence in the synthesis."""
word_count = len(content.split())
high_confidence_words = ['clear', 'strong', 'definitive', 'conclusive', 'certain']
low_confidence_words = ['uncertain', 'unclear', 'tentative', 'preliminary', 'limited']
high_conf_count = sum(content.lower().count(word) for word in high_confidence_words)
low_conf_count = sum(content.lower().count(word) for word in low_confidence_words)
base_confidence = 0.7
confidence_adjustment = (high_conf_count * 0.05) - (low_conf_count * 0.08)
overall_confidence = max(0.3, min(0.95, base_confidence + confidence_adjustment))
return {
"overall": overall_confidence,
"level": "high" if overall_confidence > 0.8 else "medium" if overall_confidence > 0.6 else "low",
"factors": {
"content_depth": word_count > 500,
"evidence_strength": high_conf_count > low_conf_count,
"consensus_level": "consensus" in content.lower()
}
}
class CoordinatorAgent(BaseAgent):
"""Agent responsible for coordinating other agents and managing workflow."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.COORDINATOR, llm)
self.agents: Dict[str, BaseAgent] = {}
self.task_queue: List[Task] = []
self.completed_tasks: List[Task] = []
self.workflow_graph = nx.DiGraph()
self.execution_history: List[Dict[str, Any]] = []
self.workflow_templates = self._create_workflow_templates()
def _create_workflow_templates(self) -> Dict[str, List[Dict[str, Any]]]:
"""Create predefined workflow templates for common problem types."""
return {
"research_analysis": [
{"role": "Researcher", "task": "Gather comprehensive information"},
{"role": "Analyst", "task": "Analyze findings and identify patterns"},
{"role": "Critic", "task": "Evaluate analysis quality"},
{"role": "Synthesizer", "task": "Create final recommendations"}
],
"strategic_planning": [
{"role": "Researcher", "task": "Research current state and trends"},
{"role": "Analyst", "task": "SWOT analysis and opportunity identification"},
{"role": "Researcher", "task": "Benchmark best practices"},
{"role": "Critic", "task": "Risk assessment and gap analysis"},
{"role": "Synthesizer", "task": "Strategic plan synthesis"}
],
"problem_solving": [
{"role": "Researcher", "task": "Define problem and gather context"},
{"role": "Analyst", "task": "Root cause analysis"},
{"role": "Researcher", "task": "Research potential solutions"},
{"role": "Critic", "task": "Evaluate solution feasibility"},
{"role": "Synthesizer", "task": "Recommend optimal solution"}
]
}
def register_agent(self, agent: BaseAgent):
"""Register an agent with the coordinator."""
self.agents[agent.name] = agent
self.workflow_graph.add_node(agent.name, role=agent.role.value)
logger.info(f"Registered agent: {agent.name} with role {agent.role.value}")
async def decompose_problem(self, problem: str, use_template: bool = False) -> List[Task]:
"""Decompose a complex problem into subtasks."""
if use_template:
template_tasks = self._match_problem_to_template(problem)
if template_tasks:
return template_tasks
if self.llm:
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems.
Decompose the problem into specific subtasks that can be assigned to specialized agents:
- Researcher: For gathering information and facts
- Analyst: For analyzing data and identifying patterns
- Critic: For evaluating quality and identifying issues
- Synthesizer: For combining insights and creating summaries
Create 4-8 clear, actionable subtasks with dependencies.
Format each task as: [Role]: [Specific task description]"""),
HumanMessage(content=f"Decompose this problem into subtasks: {problem}")
])
response = await self.llm.ainvoke(prompt.format_messages())
tasks = self._parse_tasks(response.content, problem)
else:
tasks = self._create_default_tasks(problem)
for i, task in enumerate(tasks):
task.metadata['problem_complexity'] = self._assess_problem_complexity(problem)
task.metadata['estimated_duration'] = self._estimate_task_duration(task)
return tasks
def _match_problem_to_template(self, problem: str) -> Optional[List[Task]]:
"""Match problem to a workflow template."""
problem_lower = problem.lower()
if any(word in problem_lower for word in ['strategy', 'strategic', 'plan']):
template_name = "strategic_planning"
elif any(word in problem_lower for word in ['research', 'analyze', 'investigate']):
template_name = "research_analysis"
elif any(word in problem_lower for word in ['problem', 'solve', 'solution', 'fix']):
template_name = "problem_solving"
else:
return None
template = self.workflow_templates[template_name]
tasks = []
for i, step in enumerate(template):
task = Task(
id=f"task_{i+1}",
description=f"{step['task']} for: {problem}",
metadata={
"original_problem": problem,
"suggested_role": step['role'],
"template": template_name
}
)
tasks.append(task)
return tasks
def _assess_problem_complexity(self, problem: str) -> str:
"""Assess the complexity of a problem."""
complexity_indicators = {
"high": ['multiple', 'complex', 'comprehensive', 'strategic', 'long-term'],
"medium": ['analyze', 'evaluate', 'develop', 'assess'],
"low": ['simple', 'basic', 'straightforward', 'identify']
}
problem_lower = problem.lower()
for level, indicators in complexity_indicators.items():
if any(indicator in problem_lower for indicator in indicators):
return level
return "high" if len(problem) > 200 else "medium"
def _estimate_task_duration(self, task: Task) -> float:
"""Estimate task duration in seconds."""
base_duration = 30.0
role_multipliers = {
"Researcher": 1.2,
"Analyst": 1.5,
"Critic": 1.0,
"Synthesizer": 1.3
}
role = task.metadata.get("suggested_role", "Researcher")
duration = base_duration * role_multipliers.get(role, 1.0)
complexity = task.metadata.get("problem_complexity", "medium")
if complexity == "high":
duration *= 1.5
elif complexity == "low":
duration *= 0.7
return duration
def _parse_tasks(self, content: str, original_problem: str) -> List[Task]:
"""Parse LLM response into Task objects."""
tasks = []
lines = content.split('\n')
task_id = 1
current_role = None
for line in lines:
line = line.strip()
if not line:
continue
role_found = False
for role in AgentRole:
if role.value in line or role.value.lower() in line.lower():
current_role = role.value
role_found = True
break
if current_role and ':' in line:
task_parts = line.split(':', 1)
if len(task_parts) > 1:
task_desc = task_parts[1].strip()
task = Task(
id=f"task_{task_id}",
description=task_desc,
metadata={
"original_problem": original_problem,
"suggested_role": current_role,
"source": "llm_decomposition"
}
)
tasks.append(task)
task_id += 1
if len(tasks) < 4:
tasks.extend(self._create_default_tasks(original_problem)[len(tasks):])
return tasks
def _create_default_tasks(self, problem: str) -> List[Task]:
"""Create default tasks for a problem."""
return [
Task(
id="task_1",
description=f"Research comprehensive background information on: {problem}",
metadata={"suggested_role": "Researcher", "source": "default"}
),
Task(
id="task_2",
description=f"Analyze key factors and patterns related to: {problem}",
metadata={"suggested_role": "Analyst", "source": "default"}
),
Task(
id="task_3",
description="Critically evaluate the research findings and analysis quality",
metadata={"suggested_role": "Critic", "source": "default"}
),
Task(
id="task_4",
description="Synthesize all findings into actionable insights and recommendations",
metadata={"suggested_role": "Synthesizer", "source": "default"}
)
]
def _build_dependency_graph(self, tasks: List[Task]):
"""Build a dependency graph for tasks."""
role_order = {
"Researcher": 1,
"Analyst": 2,
"Critic": 3,
"Synthesizer": 4
}
sorted_tasks = sorted(tasks, key=lambda t: role_order.get(t.metadata.get("suggested_role", "Researcher"), 5))
for i in range(len(sorted_tasks) - 1):
current_task = sorted_tasks[i]
next_task = sorted_tasks[i + 1]
current_order = role_order.get(current_task.metadata.get("suggested_role"), 0)
next_order = role_order.get(next_task.metadata.get("suggested_role"), 0)
if next_order >= current_order:
next_task.dependencies.append(current_task.id)
async def execute_workflow(self, tasks: List[Task], parallel: bool = True) -> Dict[str, Any]:
"""Execute the workflow with given tasks."""
start_time = datetime.now()
self.performance_tracker.start_tracking()
self._build_dependency_graph(tasks)
self._update_workflow_graph(tasks)
try:
if parallel:
await self._execute_parallel(tasks)
else:
await self._execute_sequential(tasks)
except Exception as e:
logger.error(f"Workflow execution error: {str(e)}")
end_time = datetime.now()
self.performance_tracker.end_tracking()
execution_time = (end_time - start_time).total_seconds()
workflow_result = {
"tasks": tasks,
"execution_time": execution_time,
"success_rate": self._calculate_success_rate(tasks),
"agent_contributions": self._compile_agent_contributions(tasks),
"workflow_graph": self.workflow_graph,
"performance_metrics": self.performance_tracker.get_performance_summary(),
"timestamp": datetime.now()
}
self.execution_history.append(workflow_result)
return workflow_result
def _update_workflow_graph(self, tasks: List[Task]):
"""Update the workflow graph with task relationships."""
for task in tasks:
self.workflow_graph.add_node(
task.id,
task_description=task.description[:50] + "...",
status=task.status.value
)
for task in tasks:
for dep_id in task.dependencies:
self.workflow_graph.add_edge(dep_id, task.id)
async def _execute_parallel(self, tasks: List[Task]) -> List[Task]:
"""Execute tasks in parallel where possible."""
completed = set()
pending = tasks.copy()
with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
while pending:
ready_tasks = [
task for task in pending
if all(dep in completed for dep in task.dependencies)
]
if not ready_tasks:
if pending:
logger.warning("No ready tasks found, executing first pending task")
ready_tasks = [pending[0]]
else:
break
if not ready_tasks:
break
future_to_task = {}
for task in ready_tasks:
agent_name = self._select_agent_for_task(task)
if agent_name and agent_name in self.agents:
agent = self.agents[agent_name]
future = executor.submit(asyncio.run, agent.process_task(task))
future_to_task[future] = (task, agent_name)
for future in as_completed(future_to_task):
task, agent_name = future_to_task[future]
try:
completed_task = future.result()
completed.add(task.id)
pending.remove(task)
self.completed_tasks.append(completed_task)
self.workflow_graph.add_edge(
self.name, agent_name,
task_id=task.id,
timestamp=datetime.now().isoformat()
)
await self._facilitate_collaboration(completed_task, agent_name)
except Exception as e:
logger.error(f"Task {task.id} failed: {str(e)}")
task.status = TaskStatus.FAILED
return tasks
async def _execute_sequential(self, tasks: List[Task]) -> List[Task]:
"""Execute tasks sequentially."""
for task in tasks:
agent_name = self._select_agent_for_task(task)
if agent_name and agent_name in self.agents:
agent = self.agents[agent_name]
completed_task = await agent.process_task(task)
self.workflow_graph.add_edge(
self.name, agent_name,
task_id=task.id,
timestamp=datetime.now().isoformat()
)
self.completed_tasks.append(completed_task)
await self._facilitate_collaboration(completed_task, agent_name)
return tasks
def _select_agent_for_task(self, task: Task) -> Optional[str]:
"""Select the best agent for a given task."""
suggested_role = task.metadata.get("suggested_role")
for agent_name, agent in self.agents.items():
if agent.role.value == suggested_role:
if agent.active and agent.current_task is None:
return agent_name
for agent_name, agent in self.agents.items():
if agent.role.value == suggested_role:
return agent_name
for agent_name, agent in self.agents.items():
if agent.active:
return agent_name
return None
async def _facilitate_collaboration(self, task: Task, agent_name: str):
"""Facilitate collaboration between agents after task completion."""
if not task.result or task.status != TaskStatus.COMPLETED:
return
collab_message = AgentMessage(
sender=agent_name,
recipient="all",
content=f"Task completed: {task.description}\nKey findings: {str(task.result)[:500]}",
message_type=MessageType.COMPLETION_REPORT,
priority=3
)
for other_agent_name, other_agent in self.agents.items():
if other_agent_name != agent_name:
if self._should_collaborate(task, other_agent):
await other_agent.memory.add_message(collab_message)
self.performance_tracker.record_collaboration()
def _should_collaborate(self, task: Task, agent: BaseAgent) -> bool:
"""Determine if an agent should receive collaboration message."""
if agent.role == AgentRole.SYNTHESIZER:
return True
if agent.role == AgentRole.CRITIC and task.metadata.get("suggested_role") in ["Researcher", "Analyst"]:
return True
if agent.role == AgentRole.ANALYST and task.metadata.get("suggested_role") == "Researcher":
return True
return False
def _calculate_success_rate(self, tasks: List[Task]) -> float:
"""Calculate the success rate of task execution."""
if not tasks:
return 0.0
successful = sum(1 for task in tasks if task.status == TaskStatus.COMPLETED)
return successful / len(tasks)
def _compile_agent_contributions(self, tasks: List[Task]) -> Dict[str, Any]:
"""Compile contributions from each agent."""
contributions = {}
for agent_name, agent in self.agents.items():
agent_tasks = [task for task in tasks if task.assigned_to == agent_name]
if agent_tasks:
total_execution_time = sum(
task.performance_metrics.get('execution_time', 0)
for task in agent_tasks
)
avg_confidence = np.mean([task.confidence for task in agent_tasks])
contributions[agent_name] = {
"role": agent.role.value,
"tasks_completed": len(agent_tasks),
"average_confidence": avg_confidence,
"total_execution_time": total_execution_time,
"collaboration_count": len(agent.collaboration_partners),
"status": agent.get_status_summary()
}
else:
contributions[agent_name] = {
"role": agent.role.value,
"tasks_completed": 0,
"average_confidence": 0.0,
"total_execution_time": 0.0,
"collaboration_count": 0,
"status": agent.get_status_summary()
}
return contributions
def get_workflow_insights(self) -> Dict[str, Any]:
"""Get insights about workflow execution patterns."""
if not self.execution_history:
return {"message": "No execution history available"}
total_executions = len(self.execution_history)
avg_execution_time = np.mean([wf['execution_time'] for wf in self.execution_history])
avg_success_rate = np.mean([wf['success_rate'] for wf in self.execution_history])
agent_stats = {}
for workflow in self.execution_history:
for agent, contrib in workflow['agent_contributions'].items():
if agent not in agent_stats:
agent_stats[agent] = {
'total_tasks': 0,
'total_time': 0,
'confidence_scores': []
}
agent_stats[agent]['total_tasks'] += contrib['tasks_completed']
agent_stats[agent]['total_time'] += contrib['total_execution_time']
if contrib['average_confidence'] > 0:
agent_stats[agent]['confidence_scores'].append(contrib['average_confidence'])
agent_efficiency = {}
for agent, stats in agent_stats.items():
if stats['total_tasks'] > 0:
agent_efficiency[agent] = {
'avg_time_per_task': stats['total_time'] / stats['total_tasks'],
'avg_confidence': np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0,
'total_tasks': stats['total_tasks']
}
return {
'total_workflows_executed': total_executions,
'average_execution_time': avg_execution_time,
'average_success_rate': avg_success_rate,
'agent_efficiency': agent_efficiency,
'most_efficient_agent': min(agent_efficiency.items(),
key=lambda x: x[1]['avg_time_per_task'])[0] if agent_efficiency else None,
'highest_quality_agent': max(agent_efficiency.items(),
key=lambda x: x[1]['avg_confidence'])[0] if agent_efficiency else None
}
class WorkflowVisualizer:
"""Handles visualization of agent interactions and workflow."""
def __init__(self):
self.color_map = Config.NODE_COLORS
self.layout_cache = {}
self.animation_frames = []
def create_workflow_graph(self, workflow_graph: nx.DiGraph,
active_agents: List[str] = None,
highlight_tasks: List[str] = None) -> go.Figure:
"""Create an interactive workflow visualization."""
if len(workflow_graph.nodes()) == 0:
return self._create_empty_graph()
pos = self._hierarchical_layout(workflow_graph)
edge_trace = self._create_edge_trace(workflow_graph, pos)
node_trace = self._create_node_trace(workflow_graph, pos, active_agents, highlight_tasks)
status_trace = self._create_status_indicators(workflow_graph, pos, active_agents)
traces = [edge_trace, node_trace]
if status_trace:
traces.append(status_trace)
fig = go.Figure(
data=traces,
layout=go.Layout(
title={
'text': 'Agent Collaboration Network',
'x': 0.5,
'xanchor': 'center',
'font': {'size': 20, 'color': '#2c3e50'}
},
showlegend=False,
hovermode='closest',
margin=dict(b=40, l=40, r=40, t=60),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor='#f8f9fa',
paper_bgcolor='white',
font=dict(family='Inter, sans-serif'),
annotations=self._create_annotations(workflow_graph, pos)
)
)
fig.update_layout(
shapes=self._create_background_shapes(pos)
)
return fig
def _create_empty_graph(self) -> go.Figure:
"""Create empty graph with message."""
fig = go.Figure()
fig.add_annotation(
text="No workflow data to display.<br>Start an analysis to see agent interactions.",
xref="paper", yref="paper",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=16, color='#7f8c8d'),
bgcolor='#ecf0f1',
borderpad=20
)
fig.update_layout(
height=500,
plot_bgcolor='#f8f9fa',
paper_bgcolor='white'
)
return fig
def _hierarchical_layout(self, G: nx.DiGraph) -> Dict:
"""Create hierarchical layout for the graph."""
graph_hash = hash(tuple(sorted(G.edges())))
if graph_hash in self.layout_cache:
return self.layout_cache[graph_hash]
hierarchy = {
'Coordinator': 0,
'Researcher': 1,
'Analyst': 2,
'Critic': 3,
'Synthesizer': 4
}
agent_nodes = [n for n in G.nodes() if G.nodes[n].get('role')]
task_nodes = [n for n in G.nodes() if 'task_' in str(n)]
pos = {}
role_x_positions = {}
for i, (role, level) in enumerate(hierarchy.items()):
role_x_positions[role] = i * 3 - 6
for node in agent_nodes:
role = G.nodes[node].get('role', 'Unknown')
level = hierarchy.get(role, 5)
x = role_x_positions.get(role, 0)
y = -level * 2.5
pos[node] = (x, y)
if task_nodes:
task_start_x = max(role_x_positions.values()) + 4 if role_x_positions else 4
cols = 3
for i, task in enumerate(task_nodes):
row = i // cols
col = i % cols
pos[task] = (task_start_x + col * 2, -row * 1.5)
self.layout_cache[graph_hash] = pos
return pos
def _create_edge_trace(self, G: nx.DiGraph, pos: Dict) -> go.Scatter:
"""Create edge trace for the graph."""
edge_x = []
edge_y = []
for edge in G.edges():
if edge[0] in pos and edge[1] in pos:
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
mid_x = (x0 + x1) / 2
mid_y = (y0 + y1) / 2 - 0.5
t = np.linspace(0, 1, 10)
for ti in t:
x = (1-ti)**2 * x0 + 2*(1-ti)*ti * mid_x + ti**2 * x1
y = (1-ti)**2 * y0 + 2*(1-ti)*ti * mid_y + ti**2 * y1
edge_x.append(x)
edge_y.append(y)
edge_x.append(None)
edge_y.append(None)
edge_trace = go.Scatter(
x=edge_x, y=edge_y,
line=dict(width=2, color='#bdc3c7'),
hoverinfo='none',
mode='lines',
opacity=0.6
)
return edge_trace
def _create_node_trace(self, G: nx.DiGraph, pos: Dict,
active_agents: List[str] = None,
highlight_tasks: List[str] = None) -> go.Scatter:
"""Create node trace for the graph."""
node_x, node_y, node_colors, node_sizes = [], [], [], []
node_text, node_hover, node_symbols = [], [], []
for node in G.nodes():
if node in pos:
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_data = G.nodes[node]
role = node_data.get('role', '')
if role:
color = self.color_map.get(role, '#95a5a6')
size = 35
symbol = 'circle'
if active_agents and node in active_agents:
size = 45
color = self._brighten_color(color)
hover_text = f"<b>{node}</b><br>Role: {role}<br>Status: Active"
display_text = node.split('-')[0]
else:
status = node_data.get('status', 'pending')
color = self._get_status_color(status)
size = 25
symbol = 'square'
if highlight_tasks and str(node) in highlight_tasks:
size = 30
color = self._brighten_color(color)
task_desc = node_data.get('task_description', 'Task')
hover_text = f"<b>{node}</b><br>{task_desc}<br>Status: {status}"
display_text = node if len(str(node)) < 10 else f"T{str(node)[-1]}"
node_colors.append(color)
node_sizes.append(size)
node_text.append(display_text)
node_hover.append(hover_text)
node_symbols.append(symbol)
node_trace = go.Scatter(
x=node_x, y=node_y,
mode='markers+text',
hoverinfo='text',
text=node_text,
hovertext=node_hover,
textposition="bottom center",
textfont=dict(size=10, color='#2c3e50'),
marker=dict(
showscale=False,
color=node_colors,
size=node_sizes,
symbol=node_symbols,
line=dict(color='white', width=2)
)
)
return node_trace
def _create_status_indicators(self, G: nx.DiGraph, pos: Dict,
active_agents: List[str] = None) -> Optional[go.Scatter]:
"""Create status indicators for active agents."""
if not active_agents:
return None
indicator_x = []
indicator_y = []
for agent in active_agents:
if agent in pos:
x, y = pos[agent]
indicator_x.append(x)
indicator_y.append(y + 0.5)
if not indicator_x:
return None
return go.Scatter(
x=indicator_x,
y=indicator_y,
mode='markers',
marker=dict(
size=8,
color='#2ecc71',
symbol='circle',
line=dict(width=2, color='white')
),
hoverinfo='skip'
)
def _create_annotations(self, G: nx.DiGraph, pos: Dict) -> List[Dict]:
"""Create annotations for the graph."""
annotations = []
role_positions = {}
for node in G.nodes():
if G.nodes[node].get('role'):
role = G.nodes[node]['role']
if role not in role_positions:
x, y = pos[node]
role_positions[role] = (x, y - 0.8)
for role, (x, y) in role_positions.items():
annotations.append(dict(
x=x, y=y,
text=f"<b>{role}</b>",
showarrow=False,
font=dict(size=12, color='#34495e'),
opacity=0.8
))
return annotations
def _create_background_shapes(self, pos: Dict) -> List[Dict]:
"""Create background shapes for visual grouping."""
shapes = []
if pos:
x_coords = [p[0] for p in pos.values()]
y_coords = [p[1] for p in pos.values()]
if x_coords and y_coords:
padding = 1.5
shapes.append(dict(
type='rect',
x0=min(x_coords) - padding,
y0=min(y_coords) - padding,
x1=max(x_coords) + padding,
y1=max(y_coords) + padding,
fillcolor='#ecf0f1',
opacity=0.3,
line=dict(width=0)
))
return shapes
def _get_status_color(self, status: str) -> str:
"""Get color based on task status."""
status_colors = {
'completed': '#2ecc71',
'in_progress': '#f39c12',
'pending': '#95a5a6',
'failed': '#e74c3c'
}
return status_colors.get(status, '#95a5a6')
def _brighten_color(self, color: str) -> str:
"""Make a color brighter for highlighting."""
if color.startswith('#'):
r = int(color[1:3], 16)
g = int(color[3:5], 16)
b = int(color[5:7], 16)
factor = 0.3
r = int(r + (255 - r) * factor)
g = int(g + (255 - g) * factor)
b = int(b + (255 - b) * factor)
return f"#{r:02x}{g:02x}{b:02x}"
return color
def create_task_timeline(self, tasks: List[Task]) -> go.Figure:
"""Create a timeline visualization of task execution."""
timeline_data = []
for task in tasks:
if task.created_at:
end_time = task.completed_at if task.completed_at else datetime.now()
timeline_data.append({
'Task': task.id,
'Agent': task.assigned_to or 'Unassigned',
'Start': task.created_at,
'Finish': end_time,
'Status': task.status.value,
'Confidence': task.confidence
})
if not timeline_data:
return self._create_empty_timeline()
df = pd.DataFrame(timeline_data)
fig = px.timeline(
df,
x_start="Start",
x_end="Finish",
y="Agent",
color="Confidence",
hover_data=["Task", "Status"],
color_continuous_scale="Viridis",
labels={'Confidence': 'Confidence Score'}
)
fig.update_layout(
title={
'text': 'Task Execution Timeline',
'x': 0.5,
'xanchor': 'center',
'font': {'size': 18, 'color': '#2c3e50'}
},
height=400,
xaxis_title="Time",
yaxis_title="Agent",
plot_bgcolor='#f8f9fa',
paper_bgcolor='white',
font=dict(family='Inter, sans-serif'),
yaxis={'categoryorder': 'total ascending'}
)
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0')
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0')
return fig
def _create_empty_timeline(self) -> go.Figure:
"""Create empty timeline with message."""
fig = go.Figure()
fig.add_annotation(
text="No task execution data available yet.",
xref="paper", yref="paper",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=14, color='#7f8c8d')
)
fig.update_layout(
height=400,
plot_bgcolor='#f8f9fa',
paper_bgcolor='white'
)
return fig
def create_confidence_heatmap(self, agent_contributions: Dict[str, Any]) -> go.Figure:
"""Create a heatmap showing agent performance metrics."""
if not agent_contributions:
return self._create_empty_heatmap()
agents = list(agent_contributions.keys())
metrics = ['Tasks Completed', 'Avg Confidence', 'Time Efficiency', 'Collaboration Score']
data = []
for metric in metrics:
row = []
for agent in agents:
contrib = agent_contributions[agent]
if metric == 'Tasks Completed':
value = contrib.get('tasks_completed', 0)
elif metric == 'Avg Confidence':
value = contrib.get('average_confidence', 0)
elif metric == 'Time Efficiency':
time_val = contrib.get('total_execution_time', 1)
tasks_val = contrib.get('tasks_completed', 1)
avg_time = time_val / tasks_val if tasks_val > 0 else float('inf')
value = min(1.0, 30.0 / avg_time) if avg_time > 0 else 0
elif metric == 'Collaboration Score':
value = min(1.0, contrib.get('collaboration_count', 0) / 5.0)
else:
value = 0
row.append(value)
data.append(row)
fig = go.Figure(data=go.Heatmap(
z=data,
x=agents,
y=metrics,
colorscale='Blues',
text=np.round(data, 2),
texttemplate='%{text}',
textfont={"size": 12},
colorbar=dict(title="Score", titleside="right"),
hoverongaps=False
))
fig.update_layout(
title={
'text': 'Agent Performance Metrics',
'x': 0.5,
'xanchor': 'center',
'font': {'size': 18, 'color': '#2c3e50'}
},
xaxis_title="Agents",
yaxis_title="Metrics",
height=350,
plot_bgcolor='white',
paper_bgcolor='white',
font=dict(family='Inter, sans-serif')
)
fig.update_xaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50')
fig.update_yaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50')
return fig
def _create_empty_heatmap(self) -> go.Figure:
"""Create empty heatmap with message."""
fig = go.Figure()
fig.add_annotation(
text="No agent performance data available yet.",
xref="paper", yref="paper",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=14, color='#7f8c8d')
)
fig.update_layout(
height=350,
plot_bgcolor='#f8f9fa',
paper_bgcolor='white'
)
return fig
def create_performance_comparison(self, performance_metrics: Dict[str, Any]) -> go.Figure:
"""Create performance comparison visualization."""
baseline_time = Config.BENCHMARK_BASELINE['single_agent_time']
actual_time = performance_metrics.get('average_task_completion_time', baseline_time)
time_improvement = performance_metrics.get('time_improvement_percentage', 0)
categories = ['Single Agent', 'Multi-Agent System']
values = [baseline_time, actual_time]
colors = ['#e74c3c', '#2ecc71']
fig = go.Figure(data=[
go.Bar(
x=categories,
y=values,
marker_color=colors,
text=[f'{v:.1f}s' for v in values],
textposition='auto'
)
])
fig.add_annotation(
x=1, y=actual_time + (baseline_time * 0.1),
text=f"{time_improvement:.1f}% Faster",
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor='#2ecc71',
font=dict(size=14, color='#2ecc71')
)
fig.update_layout(
title={
'text': 'Performance Comparison',
'x': 0.5,
'xanchor': 'center',
'font': {'size': 18, 'color': '#2c3e50'}
},
yaxis_title="Average Completion Time (seconds)",
height=400,
plot_bgcolor='#f8f9fa',
paper_bgcolor='white',
font=dict(family='Inter, sans-serif'),
showlegend=False
)
return fig
class ReportGenerator:
"""Generates comprehensive reports from multi-agent collaboration."""
def __init__(self):
self.section_generators = {
'executive_summary': self._generate_executive_summary,
'task_analysis': self._generate_task_analysis,
'agent_contributions': self._generate_agent_contributions,
'key_findings': self._generate_key_findings,
'recommendations': self._generate_recommendations,
'confidence_analysis': self._generate_confidence_analysis,
'performance_metrics': self._generate_performance_metrics
}
def generate_report(self,
workflow_result: Dict[str, Any],
problem_statement: str,
include_sections: List[str] = None) -> str:
"""Generate a comprehensive report from workflow results."""
if include_sections is None:
include_sections = list(self.section_generators.keys())
report_sections = []
report_sections.append(self._generate_header(problem_statement))
for section in include_sections:
if section in self.section_generators:
section_content = self.section_generators[section](workflow_result, problem_statement)
if section_content:
report_sections.append(section_content)
report_sections.append(self._generate_footer(workflow_result))
complete_report = "\n\n".join(report_sections)
styled_report = f"""<div style="font-family: 'Inter', sans-serif; line-height: 1.6; color: #2c3e50;">
{complete_report}
</div>"""
return styled_report
def _generate_header(self, problem_statement: str) -> str:
"""Generate report header."""
return f"""# Multi-Agent Analysis Report
<div style="background-color: #ecf0f1; padding: 20px; border-radius: 8px; margin-bottom: 20px;">
**Generated:** {datetime.now().strftime('%B %d, %Y at %I:%M %p')}
**System:** {Config.COMPANY_NAME}
**Problem Statement:** {problem_statement}
</div>
---"""
def _generate_executive_summary(self, workflow_result: Dict[str, Any],
problem_statement: str) -> str:
"""Generate executive summary section."""
tasks = workflow_result.get('tasks', [])
success_rate = workflow_result.get('success_rate', 0)
execution_time = workflow_result.get('execution_time', 0)
performance = workflow_result.get('performance_metrics', {})
synthesis_task = next((task for task in tasks if 'Synthesizer' in (task.assigned_to or '')), None)
summary_content = ""
if synthesis_task and isinstance(synthesis_task.result, dict):
exec_summary = synthesis_task.result.get('executive_summary', '')
if exec_summary:
summary_content = f"\n\n{exec_summary}"
time_improvement = performance.get('time_improvement_percentage', 0)
efficiency_score = performance.get('efficiency_score', 0)
summary = f"""## Executive Summary
<div style="background-color: #e8f5e9; padding: 15px; border-radius: 5px; margin: 10px 0;">
**Key Performance Indicators:**
- **Task Completion Rate:** {success_rate:.0%}
- **Total Execution Time:** {execution_time:.1f} seconds
- **Performance Improvement:** {time_improvement:.1f}% faster than single-agent approach
- **System Efficiency Score:** {efficiency_score:.2f}/1.0
</div>
The multi-agent system successfully analyzed the problem through coordinated efforts of specialized agents, achieving superior results compared to traditional single-agent approaches.
{summary_content}
**Bottom Line:** The analysis provides actionable insights with high confidence, leveraging the collective intelligence of multiple specialized agents."""
return summary
def _generate_task_analysis(self, workflow_result: Dict[str, Any],
problem_statement: str) -> str:
"""Generate task analysis section."""
tasks = workflow_result.get('tasks', [])
if not tasks:
return ""
content = "## Task Analysis\n\n"
completed_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED]
failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED]
content += f"""<div style="background-color: #f8f9fa; padding: 15px; border-radius: 5px; margin: 10px 0;">
**Task Overview:**
- Total Tasks: {len(tasks)}
- Completed: {len(completed_tasks)}
- Failed: {len(failed_tasks)}
- Average Confidence: {np.mean([t.confidence for t in completed_tasks]) if completed_tasks else 0:.2%}
</div>
### Task Breakdown by Agent\n\n"""
agents_tasks = {}
for task in tasks:
agent = task.assigned_to or "Unassigned"
if agent not in agents_tasks:
agents_tasks[agent] = []
agents_tasks[agent].append(task)
for agent, agent_tasks in agents_tasks.items():
role = "Unknown"
if agent_tasks:
role = agent_tasks[0].metadata.get('suggested_role', 'Unknown')
content += f"#### {agent} ({role})\n\n"
for task in agent_tasks:
status_symbol = "Completed" if task.status == TaskStatus.COMPLETED else "Failed"
confidence_bar = self._create_confidence_bar(task.confidence)
exec_time = task.performance_metrics.get('execution_time', 0)
content += f"""<div style="margin-bottom: 10px; padding: 10px; background-color: #f0f0f0; border-radius: 5px;">
**Task {task.id} ({status_symbol})** *Description:* {task.description}
*Execution Time:* {exec_time:.1f}s
*Confidence:* {confidence_bar} {task.confidence:.0%}
</div>"""
return content
def _create_confidence_bar(self, confidence: float) -> str:
"""Create visual confidence bar."""
bar_char = "█"
filled_count = int(confidence * 10)
empty_count = 10 - filled_count
return f"[{bar_char * filled_count}{' ' * empty_count}]"
def _generate_agent_contributions(self, workflow_result: Dict[str, Any],
problem_statement: str) -> str:
"""Generate agent contributions section."""
contributions = workflow_result.get('agent_contributions', {})
if not contributions:
return ""
content = "## Agent Contributions\n\n"
for agent, stats in contributions.items():
role = stats.get('role', 'Unknown')
tasks_completed = stats.get('tasks_completed', 0)
avg_confidence = stats.get('average_confidence', 0)
exec_time = stats.get('total_execution_time', 0)
collab_count = stats.get('collaboration_count', 0)
role_color = Config.NODE_COLORS.get(role, '#95a5a6')
content += f"""<div style="border-left: 4px solid {role_color}; padding-left: 15px; margin-bottom: 20px;">
### {agent}
**Role:** {role}
| Metric | Value |
|--------|-------|
| Tasks Completed | {tasks_completed} |
| Average Confidence | {avg_confidence:.0%} |
| Total Execution Time | {exec_time:.1f}s |
| Collaborations | {collab_count} |
</div>"""
return content
def _generate_key_findings(self, workflow_result: Dict[str, Any],
problem_statement: str) -> str:
"""Generate key findings section."""
tasks = workflow_result.get('tasks', [])
content = "## Key Findings\n\n"
findings_by_type = {
'Research Findings': [],
'Analytical Insights': [],
'Critical Observations': [],
'Synthesized Conclusions': []
}
for task in tasks:
if task.status == TaskStatus.COMPLETED and isinstance(task.result, dict):
role = task.metadata.get('suggested_role', '')
if 'Researcher' in role and 'key_points' in task.result:
findings_by_type['Research Findings'].extend(task.result['key_points'][:3])
elif 'Analyst' in role and 'insights' in task.result:
findings_by_type['Analytical Insights'].extend(task.result['insights'][:3])
elif 'Critic' in role and 'strengths' in task.result:
for strength in task.result['strengths'][:2]:
findings_by_type['Critical Observations'].append(strength.get('strength', str(strength)) if isinstance(strength, dict) else str(strength))
elif 'Synthesizer' in role and 'key_themes' in task.result:
for theme in task.result['key_themes'][:2]:
findings_by_type['Synthesized Conclusions'].append(theme.get('description', str(theme)) if isinstance(theme, dict) else str(theme))
has_findings = False
for finding_type, findings in findings_by_type.items():
if findings:
has_findings = True
content += f"### {finding_type}\n\n"
for finding in findings:
content += f"- {finding}\n"
content += "\n"
if not has_findings:
return content + "No specific findings were extracted from the analysis.\n"
return content
def _generate_recommendations(self, workflow_result: Dict[str, Any],
problem_statement: str) -> str:
"""Generate recommendations section."""
tasks = workflow_result.get('tasks', [])
content = "## Recommendations\n\n"
all_recommendations = []
for task in tasks:
if task.status == TaskStatus.COMPLETED and isinstance(task.result, dict):
for field in ['recommendations', 'final_recommendations', 'improvements']:
if field in task.result:
recs = task.result[field]
for rec in recs:
if isinstance(rec, dict):
all_recommendations.append(rec)
else:
all_recommendations.append({'recommendation': str(rec), 'priority': 'medium'})
if not all_recommendations:
return content + "No specific recommendations were generated.\n"
high, medium, low = [], [], []
for rec in all_recommendations:
priority = rec.get('priority', 'medium')
if priority == 'high': high.append(rec)
elif priority == 'low': low.append(rec)
else: medium.append(rec)
if high:
content += f'### High Priority\n'
for rec in high[:3]: content += f"- {rec.get('recommendation', rec)}\n"
content += "\n"
if medium:
content += f'### Medium Priority\n'
for rec in medium[:4]: content += f"- {rec.get('recommendation', rec)}\n"
content += "\n"
if low:
content += f'### Low Priority\n'
for rec in low[:3]: content += f"- {rec.get('recommendation', rec)}\n"
content += "\n"
return content
def _generate_confidence_analysis(self, workflow_result: Dict[str, Any],
problem_statement: str) -> str:
"""Generate confidence analysis section."""
tasks = workflow_result.get('tasks', [])
contributions = workflow_result.get('agent_contributions', {})
content = "## Confidence Analysis\n\n"
task_confidences = [t.confidence for t in tasks if t.confidence > 0]
overall_confidence = np.mean(task_confidences) if task_confidences else 0
confidence_level = "High" if overall_confidence > 0.8 else "Medium" if overall_confidence > 0.6 else "Low"
confidence_color = "#2ecc71" if confidence_level == "High" else "#f39c12" if confidence_level == "Medium" else "#e74c3c"
content += f"""<div style="text-align: center; margin: 20px 0;">
<div style="display: inline-block; padding: 20px; background-color: {confidence_color}; color: white; border-radius: 10px;">
<h3 style="margin: 0;">Overall Confidence Score</h3>
<h1 style="margin: 10px 0; font-size: 2.5em;">{overall_confidence:.0%}</h1>
<p style="margin: 0;">Level: {confidence_level}</p>
</div>
</div>
### Confidence by Agent Role
"""
content += "| Agent Role | Average Confidence | Tasks |\n"
content += "|------------|--------------------|-------|\n"
for agent, stats in contributions.items():
role = stats.get('role', 'Unknown')
avg_conf = stats.get('average_confidence', 0)
tasks_completed = stats.get('tasks_completed', 0)
content += f"| {role} | {avg_conf:.0%} | {tasks_completed} |\n"
content += "\n### Confidence Distribution\n\n"
high_conf = len([t for t in tasks if t.confidence >= 0.8])
medium_conf = len([t for t in tasks if 0.5 <= t.confidence < 0.8])
low_conf = len([t for t in tasks if t.confidence < 0.5])
content += f"""- **High Confidence (≥80%):** {high_conf} tasks
- **Medium Confidence (50-79%):** {medium_conf} tasks
- **Low Confidence (<50%):** {low_conf} tasks
*Confidence scores reflect the system's assessment of result quality and reliability based on evidence strength, consistency, and completeness.*"""
return content
def _generate_performance_metrics(self, workflow_result: Dict[str, Any],
problem_statement: str) -> str:
"""Generate performance metrics section."""
performance = workflow_result.get('performance_metrics', {})
if not performance:
return ""
content = "## Performance Metrics\n\n"
total_time = performance.get('total_execution_time', 0)
avg_task_time = performance.get('average_task_completion_time', 0)
total_collab = performance.get('total_collaborations', 0)
total_messages = performance.get('total_messages', 0)
efficiency = performance.get('efficiency_score', 0)
time_improvement = performance.get('time_improvement_percentage', 0)
content += f"""<div style="background-color: #f0f4f8; padding: 20px; border-radius: 8px;">
### System Performance Overview
| Metric | Value | Benchmark |
|--------|-------|-----------|
| Total Execution Time | {total_time:.1f}s | - |
| Average Task Time | {avg_task_time:.1f}s | {Config.BENCHMARK_BASELINE['single_agent_time']:.1f}s |
| Time Improvement | {time_improvement:.1f}% | Baseline |
| Total Collaborations | {total_collab} | - |
| Message Exchanges | {total_messages} | - |
| Efficiency Score | {efficiency:.2%} | >50% |
</div>
### Performance Insights
"""
if time_improvement > 30:
content += "- **Exceptional Performance**: The multi-agent system achieved significant time savings through parallel processing.\n"
elif time_improvement > 15:
content += "- **Good Performance**: The system demonstrated efficient task distribution and execution.\n"
else:
content += "- **Standard Performance**: The system completed tasks within expected parameters.\n"
if efficiency > 0.8:
content += "- **High Efficiency**: Excellent resource utilization and agent coordination.\n"
elif efficiency > 0.6:
content += "- **Moderate Efficiency**: Good balance between speed and quality.\n"
else:
content += "- **Efficiency Opportunity**: Workflows could be further optimized for performance.\n"
return content
def _generate_footer(self, workflow_result: Dict[str, Any]) -> str:
"""Generate report footer."""
execution_time = workflow_result.get('execution_time', 0)
timestamp = workflow_result.get('timestamp', datetime.now())
return f"""---
<div style="text-align: center; color: #7f8c8d; margin-top: 40px; padding: 20px; background-color: #ecf0f1; border-radius: 5px;">
**Report Generation Details** Analysis completed in {execution_time:.1f} seconds
Report generated at {timestamp.strftime('%B %d, %Y at %I:%M %p')}
*Powered by {Config.COMPANY_NAME}* *Advanced Multi-Agent AI Collaboration System*
</div>"""
def create_gradio_interface():
"""Create the main Gradio interface for the multi-agent system."""
coordinator = None
visualizer = WorkflowVisualizer()
report_generator = ReportGenerator()
current_workflow = None
current_problem = ""
demo_mode = False
def initialize_agents(api_key: str, model: str = "gpt-4", use_demo: bool = False) -> str:
"""Initialize the multi-agent system."""
nonlocal coordinator, demo_mode
demo_mode = use_demo
if not use_demo and not api_key:
return "Error: Please provide an OpenAI API key or enable Demo Mode to initialize the agents."
try:
llm = None
if not use_demo and api_key:
llm = ChatOpenAI(
api_key=api_key,
model=model,
temperature=Config.TEMPERATURE,
max_tokens=Config.MAX_TOKENS
)
nonlocal coordinator
coordinator = CoordinatorAgent("Coordinator", llm)
agents_to_register = [
ResearcherAgent("Researcher-1", llm),
AnalystAgent("Analyst-1", llm),
CriticAgent("Critic-1", llm),
SynthesizerAgent("Synthesizer-1", llm)
]
for agent in agents_to_register:
coordinator.register_agent(agent)
mode_text = "Demo Mode" if use_demo else f"Live Mode ({model})"
return f"Success: Initialized multi-agent system with {len(coordinator.agents)} agents in {mode_text}."
except Exception as e:
logger.error(f"Error initializing agents: {str(e)}")
return f"Error initializing agents: {str(e)}"
async def analyze_problem(problem: str, execution_mode: str, use_template: bool = False) -> Tuple[str, Any, Any, Any, Any]:
"""Analyze a problem using the multi-agent system."""
nonlocal current_workflow, current_problem
if not coordinator:
return "Error: Please initialize the agents first.", None, None, None, None
if not problem:
return "Error: Please enter a problem to analyze.", None, None, None, None
current_problem = problem
try:
tasks = await coordinator.decompose_problem(problem, use_template=use_template)
if not tasks:
return "Error: Failed to decompose problem into tasks.", None, None, None, None
parallel = execution_mode == "Parallel"
current_workflow = await coordinator.execute_workflow(tasks, parallel=parallel)
active_agents = list(coordinator.agents.keys())
workflow_graph = visualizer.create_workflow_graph(
current_workflow['workflow_graph'],
active_agents=active_agents
)
timeline_chart = visualizer.create_task_timeline(tasks)
confidence_heatmap = visualizer.create_confidence_heatmap(
current_workflow['agent_contributions']
)
performance_chart = visualizer.create_performance_comparison(
current_workflow['performance_metrics']
)
success_rate = current_workflow['success_rate']
execution_time = current_workflow['execution_time']
performance = current_workflow['performance_metrics']
status = f"""Analysis completed successfully!
**Results Summary:**
- Tasks executed: {len(tasks)}
- Success rate: {success_rate:.0%}
- Execution time: {execution_time:.1f} seconds
- Performance improvement: {performance.get('time_improvement_percentage', 0):.1f}% faster
- Agents involved: {len(coordinator.agents)}
**Agent Activity:**
- Total collaborations: {performance.get('total_collaborations', 0)}
- Messages exchanged: {performance.get('total_messages', 0)}
- Efficiency score: {performance.get('efficiency_score', 0):.2%}"""
return status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart
except Exception as e:
logger.error(f"Error analyzing problem: {str(e)}")
return f"Error during analysis: {str(e)}", None, None, None, None
def generate_report(selected_sections: List[str]) -> str:
"""Generate a report from the current workflow results."""
if not current_workflow:
return "Error: No analysis results available. Please run an analysis first."
try:
report = report_generator.generate_report(
current_workflow,
current_problem,
include_sections=selected_sections
)
return report
except Exception as e:
logger.error(f"Error generating report: {str(e)}")
return f"Error generating report: {str(e)}"
def get_agent_details(agent_name: str) -> str:
"""Get detailed information about a specific agent."""
if not coordinator or agent_name not in coordinator.agents:
return "Error: Agent not found or system not initialized."
agent = coordinator.agents[agent_name]
status = agent.get_status_summary()
details = f"""## Agent Profile: {agent.name}
<div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px;">
**Role:** {agent.role.value}
**Status:** {'Active' if agent.active else 'Inactive'}
**Completed Tasks:** {len(agent.completed_tasks)}
**Current Task:** {status['current_task']}
**Average Confidence:** {status['average_confidence']:.0%}
**Collaborations:** {status['collaboration_count']}
</div>
### Recent Task History
"""
if not agent.completed_tasks:
details += "No tasks completed yet."
else:
for i, task in enumerate(agent.completed_tasks[-5:], 1):
status_icon = "Completed" if task.status == TaskStatus.COMPLETED else "Failed"
exec_time = task.performance_metrics.get('execution_time', 0)
details += f"""
**{i}. {status_icon} {task.id}**
- Description: {task.description}
- Confidence: {task.confidence:.0%}
- Execution Time: {exec_time:.1f}s
"""
if agent.performance_tracker.metrics['task_completion_times']:
avg_time = np.mean(agent.performance_tracker.metrics['task_completion_times'])
details += f"\n### Performance Statistics\n"
details += f"- Average Task Time: {avg_time:.1f}s\n"
details += f"- Total Active Time: {sum(agent.performance_tracker.metrics['task_completion_times']):.1f}s\n"
return details
def get_workflow_insights() -> str:
"""Get insights about the multi-agent system performance."""
if not coordinator:
return "Error: System not initialized."
insights = coordinator.get_workflow_insights()
if insights.get('total_workflows_executed', 0) == 0:
return "No workflow executions yet. Run an analysis to see performance insights."
content = f"""## Workflow Insights
<div style="background-color: #e3f2fd; padding: 20px; border-radius: 8px;">
### System Performance Overview
- **Total Workflows:** {insights['total_workflows_executed']}
- **Average Execution Time:** {insights['average_execution_time']:.1f}s
- **Average Success Rate:** {insights['average_success_rate']:.0%}
- **Most Efficient Agent:** {insights.get('most_efficient_agent', 'N/A')}
- **Highest Quality Agent:** {insights.get('highest_quality_agent', 'N/A')}
</div>
### Agent Efficiency Rankings
"""
if insights.get('agent_efficiency'):
content += "\n| Agent | Avg Time/Task | Avg Confidence | Total Tasks |\n"
content += "|-------|---------------|----------------|-------------|\n"
for agent, efficiency in insights['agent_efficiency'].items():
content += f"| {agent} | {efficiency['avg_time_per_task']:.1f}s | "
content += f"{efficiency['avg_confidence']:.0%} | {efficiency['total_tasks']} |\n"
return content
custom_css = """
.gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', sans-serif; max-width: 1400px; margin: 0 auto; }
.gr-button { font-weight: 600; transition: all 0.3s ease; }
.gr-button-primary { background-color: #3498db !important; border-color: #3498db !important; }
.gr-button-primary:hover { background-color: #2980b9 !important; border-color: #2980b9 !important; transform: translateY(-1px); box-shadow: 0 4px 12px rgba(52, 152, 219, 0.3); }
.gr-button-secondary { background-color: #95a5a6 !important; border-color: #95a5a6 !important; }
h1 { color: #2c3e50; font-weight: 800; margin-bottom: 0.5rem; }
h2 { color: #34495e; font-weight: 600; margin-top: 1.5rem; }
h3 { color: #34495e; font-weight: 600; }
.gr-box { border-radius: 8px; border-color: #e0e0e0; }
.status-text { font-size: 14px; line-height: 1.6; }
.markdown-text { line-height: 1.6; color: #2c3e50; }
"""
with gr.Blocks(title="Multi-Agent AI Collaboration System", theme=gr.themes.Base(), css=custom_css) as interface:
gr.HTML("""
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="font-size: 2.5em; margin-bottom: 10px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent;">
Multi-Agent AI Collaboration System
</h1>
<p style="font-size: 1.1em; color: #7f8c8d;">
Advanced AI agents working together to solve complex problems through intelligent task decomposition and parallel processing.
</p>
</div>
""")
with gr.Group():
gr.Markdown("### System Configuration")
with gr.Row():
with gr.Column(scale=3):
api_key_input = gr.Textbox(label="OpenAI API Key", placeholder="sk-...", type="password", info="Required for live mode. Leave empty for demo mode.")
with gr.Column(scale=1):
model_select = gr.Dropdown(choices=["gpt-4", "gpt-3.5-turbo"], value="gpt-4", label="Model", info="Select the LLM model")
with gr.Column(scale=1):
demo_mode_checkbox = gr.Checkbox(label="Demo Mode", value=True, info="Run without an API key")
with gr.Column(scale=1):
init_button = gr.Button("Initialize Agents", variant="primary", size="lg")
init_status = gr.Textbox(label="Initialization Status", interactive=False, elem_classes=["status-text"])
with gr.Tabs() as tabs:
with gr.TabItem("Problem Analysis", id=1):
with gr.Group():
gr.Markdown("### Enter a complex problem for multi-agent analysis")
problem_input = gr.Textbox(label="Problem Statement", placeholder="Example: Analyze the potential impact of AI on healthcare delivery...", lines=3, info="Describe a complex problem that requires multiple perspectives.")
with gr.Row():
with gr.Column(scale=1):
execution_mode = gr.Radio(choices=["Sequential", "Parallel"], value="Parallel", label="Execution Mode", info="Parallel mode is faster.")
with gr.Column(scale=1):
use_template = gr.Checkbox(label="Use Workflow Template", value=True, info="Automatically match to predefined workflows.")
with gr.Column(scale=2):
analyze_button = gr.Button("Analyze Problem", variant="primary", size="lg")
analysis_status = gr.Textbox(label="Analysis Status", interactive=False, lines=8, elem_classes=["status-text"])
with gr.Group():
gr.Markdown("### Analysis Visualizations")
with gr.Row():
workflow_graph = gr.Plot(label="Agent Collaboration Network")
with gr.Row():
with gr.Column():
timeline_chart = gr.Plot(label="Task Execution Timeline")
with gr.Column():
confidence_heatmap = gr.Plot(label="Agent Performance Metrics")
with gr.Row():
performance_chart = gr.Plot(label="Performance Comparison")
with gr.TabItem("Agent Details", id=2):
with gr.Group():
gr.Markdown("### View detailed information about each agent")
with gr.Row():
agent_selector = gr.Dropdown(choices=["Researcher-1", "Analyst-1", "Critic-1", "Synthesizer-1"], label="Select Agent", info="Choose an agent to view their profile.")
agent_details_button = gr.Button("Get Agent Details", variant="secondary")
agent_details_output = gr.Markdown(elem_classes=["markdown-text"])
with gr.Group():
gr.Markdown("### System Insights")
insights_button = gr.Button("Get Workflow Insights", variant="secondary")
insights_output = gr.Markdown(elem_classes=["markdown-text"])
with gr.TabItem("Report Generation", id=3):
with gr.Group():
gr.Markdown("### Generate comprehensive analysis report")
section_selector = gr.CheckboxGroup(
choices=["executive_summary", "task_analysis", "agent_contributions", "key_findings", "recommendations", "confidence_analysis", "performance_metrics"],
value=["executive_summary", "key_findings", "recommendations", "confidence_analysis"],
label="Select Report Sections", info="Choose which sections to include in the report."
)
generate_report_button = gr.Button("Generate Report", variant="primary", size="lg")
report_output = gr.Markdown(elem_classes=["markdown-text"])
with gr.TabItem("Example Problems", id=4):
gr.Markdown("""### Example Problems for Analysis
Click on any example to load it into the analysis tab. These examples demonstrate different types of complex problems suitable for multi-agent analysis.""")
example_problems = [
{"title": "Business Strategy", "problem": "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty and managing existing physical stores.", "description": "Complex business transformation requiring market analysis and strategic planning."},
{"title": "Technology Assessment", "problem": "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management for a global manufacturing company.", "description": "Technical evaluation requiring understanding of emerging technology and business operations."},
{"title": "Market Analysis", "problem": "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants in the North American market.", "description": "Market research requiring industry analysis and competitor assessment."},
{"title": "Policy Evaluation", "problem": "Assess the implications of remote work policies on organizational culture, productivity, and talent retention in technology companies.", "description": "Organizational analysis requiring understanding of HR, culture, and productivity."},
{"title": "Innovation Planning", "problem": "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools while ensuring patient privacy and regulatory compliance.", "description": "Innovation strategy requiring technical, regulatory, and ethical considerations."}
]
for i, example in enumerate(example_problems):
with gr.Group():
gr.Markdown(f"#### {example['title']}\n*{example['description']}*")
btn = gr.Button("Load This Example", variant="secondary", size="sm")
btn.click(fn=lambda p=example['problem']: p, outputs=problem_input).then(fn=lambda: gr.Tabs(selected=1), outputs=tabs)
# Event Handlers
init_button.click(
fn=initialize_agents,
inputs=[api_key_input, model_select, demo_mode_checkbox],
outputs=init_status
)
analyze_button.click(
fn=lambda p, m, t: asyncio.run(analyze_problem(p, m, t)),
inputs=[problem_input, execution_mode, use_template],
outputs=[analysis_status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart]
)
agent_details_button.click(
fn=get_agent_details,
inputs=agent_selector,
outputs=agent_details_output
)
insights_button.click(
fn=get_workflow_insights,
inputs=[],
outputs=insights_output
)
generate_report_button.click(
fn=generate_report,
inputs=section_selector,
outputs=report_output
)
return interface
if __name__ == "__main__":
interface = create_gradio_interface()
interface.launch(share=True, show_error=True)