|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import json |
|
import time |
|
import asyncio |
|
import hashlib |
|
import logging |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Tuple, Optional, Any, Union, Set |
|
from dataclasses import dataclass, field |
|
from enum import Enum |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
import networkx as nx |
|
import matplotlib.pyplot as plt |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
from plotly.subplots import make_subplots |
|
|
|
|
|
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage |
|
from langchain_openai import ChatOpenAI |
|
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
from langchain.memory import ConversationBufferMemory |
|
from pydantic import BaseModel, Field |
|
|
|
|
|
from reportlab.lib import colors |
|
from reportlab.lib.pagesizes import letter |
|
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, PageBreak |
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
from reportlab.lib.units import inch |
|
|
|
|
|
import aiohttp |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class Config: |
|
"""Configuration settings for the multi-agent system.""" |
|
|
|
|
|
DEFAULT_MODEL = "gpt-4" |
|
TEMPERATURE = 0.7 |
|
MAX_TOKENS = 1500 |
|
|
|
|
|
MAX_ITERATIONS = 10 |
|
COLLABORATION_TIMEOUT = 300 |
|
|
|
|
|
GRAPH_UPDATE_INTERVAL = 0.5 |
|
NODE_COLORS = { |
|
'Researcher': '#3498db', |
|
'Analyst': '#e74c3c', |
|
'Critic': '#f39c12', |
|
'Synthesizer': '#2ecc71', |
|
'Coordinator': '#9b59b6' |
|
} |
|
|
|
|
|
ENABLE_PERFORMANCE_TRACKING = True |
|
BENCHMARK_BASELINE = { |
|
"single_agent_time": 45.0, |
|
"single_agent_quality": 0.72 |
|
} |
|
|
|
|
|
CONFIDENCE_THRESHOLD = 0.7 |
|
MAX_REPORT_SECTIONS = 10 |
|
COMPANY_NAME = "Multi-Agent AI Platform" |
|
|
|
|
|
DEMO_MODE_ENABLED = True |
|
DEMO_PROBLEMS = [ |
|
"Analyze the impact of remote work on team productivity and collaboration", |
|
"Develop a strategy for sustainable urban transportation", |
|
"Evaluate the risks and benefits of AI in healthcare", |
|
"Design a framework for ethical AI development", |
|
"Create a plan for digital transformation in education" |
|
] |
|
|
|
class AgentRole(Enum): |
|
"""Enumeration of agent roles in the system.""" |
|
RESEARCHER = "Researcher" |
|
ANALYST = "Analyst" |
|
CRITIC = "Critic" |
|
SYNTHESIZER = "Synthesizer" |
|
COORDINATOR = "Coordinator" |
|
|
|
class TaskStatus(Enum): |
|
"""Task execution status.""" |
|
PENDING = "pending" |
|
IN_PROGRESS = "in_progress" |
|
COMPLETED = "completed" |
|
FAILED = "failed" |
|
|
|
class MessageType(Enum): |
|
"""Types of messages between agents.""" |
|
TASK_ASSIGNMENT = "task_assignment" |
|
COLLABORATION_REQUEST = "collaboration_request" |
|
INFORMATION_SHARING = "information_sharing" |
|
FEEDBACK = "feedback" |
|
COMPLETION_REPORT = "completion_report" |
|
|
|
@dataclass |
|
class Task: |
|
"""Represents a task to be executed by agents.""" |
|
id: str |
|
description: str |
|
assigned_to: Optional[str] = None |
|
status: TaskStatus = TaskStatus.PENDING |
|
dependencies: List[str] = field(default_factory=list) |
|
result: Optional[Any] = None |
|
confidence: float = 0.0 |
|
created_at: datetime = field(default_factory=datetime.now) |
|
completed_at: Optional[datetime] = None |
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
performance_metrics: Dict[str, float] = field(default_factory=dict) |
|
|
|
@dataclass |
|
class AgentMessage: |
|
"""Message passed between agents.""" |
|
sender: str |
|
recipient: str |
|
content: str |
|
message_type: MessageType |
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
timestamp: datetime = field(default_factory=datetime.now) |
|
priority: int = 1 |
|
|
|
class PerformanceTracker: |
|
"""Tracks performance metrics for the multi-agent system.""" |
|
|
|
def __init__(self): |
|
self.metrics = { |
|
'task_completion_times': [], |
|
'agent_utilization': {}, |
|
'collaboration_count': 0, |
|
'total_messages': 0, |
|
'quality_scores': [], |
|
'system_start_time': None, |
|
'system_end_time': None |
|
} |
|
|
|
def start_tracking(self): |
|
"""Start performance tracking.""" |
|
self.metrics['system_start_time'] = datetime.now() |
|
|
|
def end_tracking(self): |
|
"""End performance tracking.""" |
|
self.metrics['system_end_time'] = datetime.now() |
|
|
|
def record_task_completion(self, task: Task): |
|
"""Record task completion metrics.""" |
|
if task.created_at and task.completed_at: |
|
completion_time = (task.completed_at - task.created_at).total_seconds() |
|
self.metrics['task_completion_times'].append(completion_time) |
|
|
|
def record_agent_activity(self, agent_name: str, activity_duration: float): |
|
"""Record agent activity duration.""" |
|
if agent_name not in self.metrics['agent_utilization']: |
|
self.metrics['agent_utilization'][agent_name] = 0 |
|
self.metrics['agent_utilization'][agent_name] += activity_duration |
|
|
|
def record_collaboration(self): |
|
"""Record a collaboration event.""" |
|
self.metrics['collaboration_count'] += 1 |
|
|
|
def record_message(self): |
|
"""Record a message exchange.""" |
|
self.metrics['total_messages'] += 1 |
|
|
|
def get_performance_summary(self) -> Dict[str, Any]: |
|
"""Get performance summary statistics.""" |
|
total_time = 0 |
|
if self.metrics['system_start_time'] and self.metrics['system_end_time']: |
|
total_time = (self.metrics['system_end_time'] - |
|
self.metrics['system_start_time']).total_seconds() |
|
|
|
avg_task_time = np.mean(self.metrics['task_completion_times']) if self.metrics['task_completion_times'] else 0 |
|
|
|
|
|
baseline_time = Config.BENCHMARK_BASELINE['single_agent_time'] |
|
time_improvement = ((baseline_time - avg_task_time) / baseline_time * 100) if avg_task_time > 0 else 0 |
|
|
|
return { |
|
'total_execution_time': total_time, |
|
'average_task_completion_time': avg_task_time, |
|
'total_collaborations': self.metrics['collaboration_count'], |
|
'total_messages': self.metrics['total_messages'], |
|
'agent_utilization': self.metrics['agent_utilization'], |
|
'time_improvement_percentage': time_improvement, |
|
'efficiency_score': self._calculate_efficiency_score() |
|
} |
|
|
|
def _calculate_efficiency_score(self) -> float: |
|
"""Calculate overall efficiency score.""" |
|
factors = [] |
|
|
|
|
|
if self.metrics['task_completion_times']: |
|
avg_time = np.mean(self.metrics['task_completion_times']) |
|
speed_factor = min(1.0, Config.BENCHMARK_BASELINE['single_agent_time'] / avg_time) |
|
factors.append(speed_factor) |
|
|
|
|
|
if self.metrics['total_messages'] > 0: |
|
collab_factor = min(1.0, self.metrics['collaboration_count'] / (self.metrics['total_messages'] * 0.3)) |
|
factors.append(collab_factor) |
|
|
|
|
|
if self.metrics['agent_utilization']: |
|
utilization_values = list(self.metrics['agent_utilization'].values()) |
|
if utilization_values: |
|
avg_utilization = np.mean(utilization_values) |
|
max_utilization = max(utilization_values) |
|
balance_factor = avg_utilization / max_utilization if max_utilization > 0 else 0 |
|
factors.append(balance_factor) |
|
|
|
return np.mean(factors) if factors else 0.5 |
|
|
|
class AgentMemory: |
|
"""Manages agent conversation history and context.""" |
|
|
|
def __init__(self, max_messages: int = 50): |
|
self.messages: List[AgentMessage] = [] |
|
self.max_messages = max_messages |
|
self.context: Dict[str, Any] = {} |
|
self.knowledge_base: Dict[str, Any] = {} |
|
|
|
def add_message(self, message: AgentMessage): |
|
"""Add a message to memory.""" |
|
self.messages.append(message) |
|
if len(self.messages) > self.max_messages: |
|
self.messages.pop(0) |
|
|
|
|
|
self._extract_knowledge(message) |
|
|
|
def get_recent_messages(self, n: int = 10) -> List[AgentMessage]: |
|
"""Get n most recent messages.""" |
|
return self.messages[-n:] |
|
|
|
def get_messages_by_sender(self, sender: str) -> List[AgentMessage]: |
|
"""Get all messages from a specific sender.""" |
|
return [msg for msg in self.messages if msg.sender == sender] |
|
|
|
def get_high_priority_messages(self) -> List[AgentMessage]: |
|
"""Get high priority messages.""" |
|
return [msg for msg in self.messages if msg.priority >= 4] |
|
|
|
def update_context(self, key: str, value: Any): |
|
"""Update context information.""" |
|
self.context[key] = value |
|
|
|
def get_context(self, key: str) -> Any: |
|
"""Get context information.""" |
|
return self.context.get(key) |
|
|
|
def _extract_knowledge(self, message: AgentMessage): |
|
"""Extract and store important knowledge from messages.""" |
|
keywords = ['finding', 'conclusion', 'recommendation', 'insight', 'pattern'] |
|
content_lower = message.content.lower() |
|
|
|
for keyword in keywords: |
|
if keyword in content_lower: |
|
knowledge_key = f"{message.sender}_{keyword}_{len(self.knowledge_base)}" |
|
self.knowledge_base[knowledge_key] = { |
|
'content': message.content, |
|
'sender': message.sender, |
|
'timestamp': message.timestamp, |
|
'type': keyword |
|
} |
|
|
|
class BaseAgent: |
|
"""Base class for all AI agents in the system.""" |
|
|
|
def __init__(self, name: str, role: AgentRole, llm: Optional[ChatOpenAI] = None): |
|
self.name = name |
|
self.role = role |
|
self.llm = llm |
|
self.memory = AgentMemory() |
|
self.active = True |
|
self.current_task: Optional[Task] = None |
|
self.completed_tasks: List[Task] = [] |
|
self.performance_tracker = PerformanceTracker() |
|
self.collaboration_partners: Set[str] = set() |
|
|
|
async def process_task(self, task: Task) -> Task: |
|
"""Process a task and return the result.""" |
|
self.current_task = task |
|
task.status = TaskStatus.IN_PROGRESS |
|
task.assigned_to = self.name |
|
|
|
|
|
start_time = datetime.now() |
|
|
|
try: |
|
|
|
if self.llm: |
|
result = await self._execute_task(task) |
|
else: |
|
|
|
result = await self._simulate_task_execution(task) |
|
|
|
task.result = result |
|
task.status = TaskStatus.COMPLETED |
|
task.completed_at = datetime.now() |
|
task.confidence = self._calculate_confidence(result) |
|
|
|
|
|
execution_time = (task.completed_at - start_time).total_seconds() |
|
task.performance_metrics['execution_time'] = execution_time |
|
task.performance_metrics['confidence'] = task.confidence |
|
|
|
self.completed_tasks.append(task) |
|
self.performance_tracker.record_task_completion(task) |
|
|
|
except Exception as e: |
|
logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}") |
|
task.status = TaskStatus.FAILED |
|
task.result = f"Error: {str(e)}" |
|
task.confidence = 0.0 |
|
|
|
finally: |
|
self.current_task = None |
|
|
|
return task |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute the task - to be implemented by subclasses.""" |
|
raise NotImplementedError("Subclasses must implement _execute_task") |
|
|
|
async def _simulate_task_execution(self, task: Task) -> Any: |
|
"""Simulate task execution for demo mode.""" |
|
|
|
await asyncio.sleep(np.random.uniform(2, 4)) |
|
|
|
|
|
simulation_templates = { |
|
AgentRole.RESEARCHER: { |
|
"findings": f"Comprehensive research on '{task.description}' reveals multiple perspectives and critical data points.", |
|
"sources": ["Academic studies", "Industry reports", "Expert interviews", "Market analysis"], |
|
"key_points": [ |
|
"Significant trends identified in the domain", |
|
"Multiple stakeholder perspectives considered", |
|
"Historical context provides important insights", |
|
"Current state analysis reveals opportunities", |
|
"Future projections indicate growth potential" |
|
], |
|
"data_collected": { |
|
"quantitative": "Statistical analysis of 500+ data points", |
|
"qualitative": "In-depth interviews with 20 experts" |
|
}, |
|
"research_quality_score": 0.92 |
|
}, |
|
AgentRole.ANALYST: { |
|
"analysis": f"Detailed analysis of '{task.description}' reveals clear patterns and actionable insights.", |
|
"patterns": [ |
|
{"description": "Upward trend in adoption rates", "type": "trend", "confidence": 0.89}, |
|
{"description": "Strong correlation between factors X and Y", "type": "correlation", "confidence": 0.91}, |
|
{"description": "Seasonal variations detected", "type": "cyclical", "confidence": 0.87} |
|
], |
|
"insights": [ |
|
"Data suggests strong positive outcomes with 85% confidence", |
|
"Multiple factors contribute to observed patterns", |
|
"Strategic opportunities identified in 3 key areas", |
|
"Risk factors are manageable with proper mitigation" |
|
], |
|
"recommendations": [ |
|
{"recommendation": "Implement phased approach", "priority": "high"}, |
|
{"recommendation": "Focus on high-impact areas first", "priority": "high"}, |
|
{"recommendation": "Monitor key metrics continuously", "priority": "medium"} |
|
], |
|
"confidence_metrics": {"overall_confidence": 0.88, "data_quality": 0.90} |
|
}, |
|
AgentRole.CRITIC: { |
|
"evaluation": f"Critical evaluation of '{task.description}' identifies strengths and areas for improvement.", |
|
"strengths": [ |
|
{"strength": "Comprehensive data coverage", "category": "methodology", "impact": "high"}, |
|
{"strength": "Well-structured analysis approach", "category": "process", "impact": "high"}, |
|
{"strength": "Clear evidence supporting conclusions", "category": "evidence", "impact": "medium"} |
|
], |
|
"weaknesses": [ |
|
{"weakness": "Limited geographic scope", "severity": "medium", "category": "completeness"}, |
|
{"weakness": "Some assumptions require validation", "severity": "low", "category": "methodology"} |
|
], |
|
"gaps": [ |
|
"Additional longitudinal data would strengthen conclusions", |
|
"Competitive analysis could be expanded" |
|
], |
|
"improvements": [ |
|
{"suggestion": "Include more diverse data sources", "priority": "high", "effort": "medium"}, |
|
{"suggestion": "Validate assumptions with field testing", "priority": "medium", "effort": "high"} |
|
], |
|
"quality_score": {"overall": 0.85, "breakdown": {"accuracy": 0.88, "completeness": 0.82, "logic": 0.90}} |
|
}, |
|
AgentRole.SYNTHESIZER: { |
|
"synthesis": f"Comprehensive synthesis for '{task.description}' integrates all findings into actionable strategy.", |
|
"key_themes": [ |
|
{"theme": "Digital transformation opportunity", "description": "Strong potential for technology adoption", "importance": "high"}, |
|
{"theme": "Customer-centric approach", "description": "Focus on user experience drives success", "importance": "high"}, |
|
{"theme": "Phased implementation", "description": "Gradual rollout minimizes risk", "importance": "medium"} |
|
], |
|
"consensus_points": [ |
|
{"point": "All agents agree on strategic direction", "strength": "strong"}, |
|
{"point": "Timeline expectations are aligned", "strength": "strong"}, |
|
{"point": "Resource requirements are reasonable", "strength": "moderate"} |
|
], |
|
"final_recommendations": [ |
|
{"recommendation": "Launch pilot program in Q1", "priority": "high", "timeframe": "immediate"}, |
|
{"recommendation": "Establish KPI dashboard", "priority": "high", "timeframe": "immediate"}, |
|
{"recommendation": "Build stakeholder coalition", "priority": "medium", "timeframe": "short-term"}, |
|
{"recommendation": "Develop training programs", "priority": "medium", "timeframe": "medium-term"} |
|
], |
|
"executive_summary": "Based on comprehensive multi-agent analysis, we recommend a phased approach to implementation with focus on quick wins and risk mitigation. The strategy balances innovation with practical considerations.", |
|
"action_items": [ |
|
{"action": "Form implementation task force", "owner": "Leadership", "deadline": "2 weeks"}, |
|
{"action": "Develop detailed project plan", "owner": "PMO", "deadline": "1 month"}, |
|
{"action": "Secure budget approval", "owner": "Finance", "deadline": "1 month"} |
|
], |
|
"confidence_level": {"overall": 0.91, "factors": {"evidence_strength": True, "consensus_level": True}} |
|
} |
|
} |
|
|
|
return simulation_templates.get(self.role, {"result": "Task completed successfully"}) |
|
|
|
def _calculate_confidence(self, result: Any) -> float: |
|
"""Calculate confidence score for the result.""" |
|
base_confidence = 0.7 |
|
|
|
if result and isinstance(result, dict): |
|
|
|
if 'confidence_metrics' in result: |
|
return result['confidence_metrics'].get('overall_confidence', base_confidence) |
|
|
|
|
|
expected_keys = {'findings', 'analysis', 'evaluation', 'synthesis'} |
|
actual_keys = set(result.keys()) |
|
completeness = len(actual_keys.intersection(expected_keys)) / len(expected_keys) |
|
|
|
|
|
content_length = sum(len(str(v)) for v in result.values() if isinstance(v, (str, list))) |
|
length_factor = min(1.0, content_length / 1000) |
|
|
|
|
|
quality_indicators = ['quality_score', 'confidence_level', 'research_quality_score'] |
|
quality_bonus = 0.1 if any(ind in result for ind in quality_indicators) else 0 |
|
|
|
confidence = base_confidence + (completeness * 0.15) + (length_factor * 0.1) + quality_bonus |
|
return min(0.95, confidence) |
|
|
|
return base_confidence |
|
|
|
async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage) -> AgentMessage: |
|
"""Handle collaboration with another agent.""" |
|
self.memory.add_message(message) |
|
self.collaboration_partners.add(other_agent.name) |
|
self.performance_tracker.record_collaboration() |
|
|
|
|
|
response_content = await self._process_collaboration(message) |
|
|
|
|
|
response_message = AgentMessage( |
|
sender=self.name, |
|
recipient=other_agent.name, |
|
content=response_content, |
|
message_type=MessageType.INFORMATION_SHARING, |
|
priority=message.priority |
|
) |
|
|
|
other_agent.memory.add_message(response_message) |
|
self.performance_tracker.record_message() |
|
|
|
return response_message |
|
|
|
async def _process_collaboration(self, message: AgentMessage) -> str: |
|
"""Process collaboration message.""" |
|
|
|
role_responses = { |
|
AgentRole.RESEARCHER: f"Research findings indicate: Based on my investigation, {message.content} aligns with current data trends.", |
|
AgentRole.ANALYST: f"Analytical perspective: The patterns I've identified support {message.content} with 87% confidence.", |
|
AgentRole.CRITIC: f"Critical assessment: While {message.content} has merit, we should also consider potential risks.", |
|
AgentRole.SYNTHESIZER: f"Synthesis observation: Integrating {message.content} into our comprehensive strategy." |
|
} |
|
|
|
return role_responses.get(self.role, f"{self.name} acknowledges: {message.content}") |
|
|
|
def get_status_summary(self) -> Dict[str, Any]: |
|
"""Get current status summary of the agent.""" |
|
return { |
|
'name': self.name, |
|
'role': self.role.value, |
|
'active': self.active, |
|
'current_task': self.current_task.description if self.current_task else None, |
|
'completed_tasks': len(self.completed_tasks), |
|
'average_confidence': np.mean([t.confidence for t in self.completed_tasks]) if self.completed_tasks else 0, |
|
'collaboration_count': len(self.collaboration_partners), |
|
'memory_size': len(self.memory.messages) |
|
} |
|
|
|
class ResearcherAgent(BaseAgent): |
|
"""Agent specialized in researching and gathering information.""" |
|
|
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.RESEARCHER, llm) |
|
self.research_sources: List[str] = [] |
|
self.research_methods = ["literature_review", "data_collection", "expert_consultation", "field_research"] |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute research task.""" |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Research Agent specializing in gathering comprehensive information. |
|
Your role is to: |
|
1. Break down complex topics into research questions |
|
2. Identify key information sources and data points |
|
3. Provide detailed, factual information with citations where possible |
|
4. Flag areas requiring further investigation |
|
5. Maintain objectivity and consider multiple perspectives"""), |
|
HumanMessage(content=f"Research the following: {task.description}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
research_result = { |
|
"findings": response.content, |
|
"sources": self._extract_sources(response.content), |
|
"key_points": self._extract_key_points(response.content), |
|
"areas_for_investigation": self._identify_gaps(response.content), |
|
"research_quality_score": self._assess_research_quality(response.content) |
|
} |
|
|
|
|
|
self.memory.update_context('latest_research', research_result) |
|
|
|
return research_result |
|
|
|
def _extract_sources(self, content: str) -> List[str]: |
|
"""Extract potential sources from research content.""" |
|
sources = [] |
|
source_indicators = ['source:', 'reference:', 'based on:', 'according to', 'study:', 'report:'] |
|
|
|
lines = content.split('\n') |
|
for line in lines: |
|
line_lower = line.lower() |
|
for indicator in source_indicators: |
|
if indicator in line_lower: |
|
sources.append(line.strip()) |
|
break |
|
|
|
return sources[:10] |
|
|
|
def _extract_key_points(self, content: str) -> List[str]: |
|
"""Extract key points from research.""" |
|
key_points = [] |
|
lines = content.split('\n') |
|
|
|
for line in lines: |
|
line = line.strip() |
|
|
|
if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')): |
|
key_points.append(line) |
|
|
|
elif any(phrase in line.lower() for phrase in ['key finding:', 'important:', 'notably:']): |
|
key_points.append(line) |
|
|
|
return key_points[:15] |
|
|
|
def _identify_gaps(self, content: str) -> List[str]: |
|
"""Identify areas needing more research.""" |
|
gaps = [] |
|
gap_indicators = ['unclear', 'requires further', 'need more', 'investigate', |
|
'unknown', 'limited data', 'insufficient evidence'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in gap_indicators): |
|
gaps.append(sentence.strip() + '.') |
|
|
|
return gaps[:5] |
|
|
|
def _assess_research_quality(self, content: str) -> float: |
|
"""Assess the quality of research output.""" |
|
quality_score = 0.5 |
|
|
|
|
|
if self._extract_sources(content): |
|
quality_score += 0.15 |
|
|
|
|
|
if self._extract_key_points(content): |
|
quality_score += 0.15 |
|
|
|
|
|
word_count = len(content.split()) |
|
if word_count > 300: |
|
quality_score += 0.1 |
|
|
|
|
|
analytical_terms = ['analysis', 'evaluation', 'comparison', 'contrast', 'implication'] |
|
if any(term in content.lower() for term in analytical_terms): |
|
quality_score += 0.1 |
|
|
|
return min(1.0, quality_score) |
|
|
|
class AnalystAgent(BaseAgent): |
|
"""Agent specialized in analyzing data and identifying patterns.""" |
|
|
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.ANALYST, llm) |
|
self.analysis_methods = ["statistical", "comparative", "trend", "causal", "predictive"] |
|
self.analysis_frameworks = ["SWOT", "PESTLE", "Porter's Five Forces", "Cost-Benefit"] |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute analysis task.""" |
|
|
|
context = self._get_relevant_context(task) |
|
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are an Analyst Agent specializing in data analysis and pattern recognition. |
|
Your role is to: |
|
1. Analyze information systematically and objectively |
|
2. Identify patterns, trends, and correlations |
|
3. Provide quantitative insights where possible |
|
4. Draw logical conclusions based on evidence |
|
5. Apply appropriate analytical frameworks |
|
6. Consider multiple analytical perspectives"""), |
|
HumanMessage(content=f"Analyze the following: {task.description}\n\nContext: {context}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
analysis_result = { |
|
"analysis": response.content, |
|
"patterns": self._identify_patterns(response.content), |
|
"insights": self._extract_insights(response.content), |
|
"recommendations": self._generate_recommendations(response.content), |
|
"confidence_metrics": self._calculate_analysis_confidence(response.content), |
|
"analytical_framework": self._identify_framework_used(response.content) |
|
} |
|
|
|
|
|
self.memory.update_context('latest_analysis', analysis_result) |
|
|
|
return analysis_result |
|
|
|
def _get_relevant_context(self, task: Task) -> str: |
|
"""Get relevant context from memory for the task.""" |
|
context_items = [] |
|
|
|
|
|
recent_messages = self.memory.get_recent_messages(5) |
|
for msg in recent_messages: |
|
if task.description.lower() in msg.content.lower(): |
|
context_items.append(f"Previous finding: {msg.content[:200]}...") |
|
|
|
|
|
for key, knowledge in self.memory.knowledge_base.items(): |
|
if 'finding' in knowledge['type'] or 'insight' in knowledge['type']: |
|
context_items.append(f"Known insight: {knowledge['content'][:200]}...") |
|
|
|
return "\n".join(context_items[:3]) |
|
|
|
def _identify_patterns(self, content: str) -> List[Dict[str, str]]: |
|
"""Identify patterns in the analysis.""" |
|
patterns = [] |
|
pattern_types = { |
|
'trend': ['trend', 'increasing', 'decreasing', 'growth', 'decline'], |
|
'correlation': ['correlation', 'relationship', 'associated', 'linked'], |
|
'cyclical': ['cycle', 'periodic', 'seasonal', 'recurring'], |
|
'anomaly': ['anomaly', 'outlier', 'unusual', 'exceptional'] |
|
} |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
for pattern_type, keywords in pattern_types.items(): |
|
if any(keyword in sentence_lower for keyword in keywords): |
|
patterns.append({ |
|
"description": sentence.strip() + '.', |
|
"type": pattern_type, |
|
"confidence": 0.8 |
|
}) |
|
break |
|
|
|
return patterns[:8] |
|
|
|
def _extract_insights(self, content: str) -> List[str]: |
|
"""Extract key insights from analysis.""" |
|
insights = [] |
|
insight_indicators = ['shows', 'indicates', 'suggests', 'reveals', |
|
'demonstrates', 'implies', 'means that', 'therefore'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in insight_indicators): |
|
insights.append(sentence.strip() + '.') |
|
|
|
return insights[:10] |
|
|
|
def _generate_recommendations(self, content: str) -> List[Dict[str, str]]: |
|
"""Generate recommendations based on analysis.""" |
|
recommendations = [] |
|
rec_indicators = ['recommend', 'suggest', 'should', 'consider', |
|
'advise', 'propose', 'it would be beneficial'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in rec_indicators): |
|
recommendations.append({ |
|
"recommendation": sentence.strip() + '.', |
|
"priority": "high" if any(word in sentence_lower for word in ['critical', 'essential', 'must']) else "medium" |
|
}) |
|
|
|
return recommendations[:7] |
|
|
|
def _calculate_analysis_confidence(self, content: str) -> Dict[str, float]: |
|
"""Calculate confidence metrics for the analysis.""" |
|
|
|
evidence_count = sum(content.lower().count(word) for word in ['evidence', 'data', 'shows', 'proves']) |
|
uncertainty_count = sum(content.lower().count(word) for word in ['may', 'might', 'possibly', 'perhaps']) |
|
|
|
|
|
evidence_strength = min(1.0, evidence_count / 10) |
|
certainty_level = max(0.0, 1.0 - (uncertainty_count / 10)) |
|
|
|
|
|
quantitative_indicators = ['percentage', '%', 'ratio', 'correlation', 'statistical'] |
|
quantitative_score = 0.7 if any(ind in content.lower() for ind in quantitative_indicators) else 0.5 |
|
|
|
overall_confidence = (evidence_strength + certainty_level + quantitative_score) / 3 |
|
|
|
return { |
|
"overall_confidence": overall_confidence, |
|
"evidence_strength": evidence_strength, |
|
"certainty_level": certainty_level, |
|
"quantitative_score": quantitative_score |
|
} |
|
|
|
def _identify_framework_used(self, content: str) -> Optional[str]: |
|
"""Identify which analytical framework was used.""" |
|
content_lower = content.lower() |
|
|
|
for framework in self.analysis_frameworks: |
|
if framework.lower() in content_lower: |
|
return framework |
|
|
|
|
|
if all(word in content_lower for word in ['strength', 'weakness', 'opportunity', 'threat']): |
|
return "SWOT" |
|
elif any(word in content_lower for word in ['political', 'economic', 'social', 'technological']): |
|
return "PESTLE" |
|
|
|
return None |
|
|
|
class CriticAgent(BaseAgent): |
|
"""Agent specialized in critical evaluation and quality assurance.""" |
|
|
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.CRITIC, llm) |
|
self.evaluation_criteria = [ |
|
"accuracy", "completeness", "logic", "evidence", |
|
"clarity", "relevance", "consistency", "objectivity" |
|
] |
|
self.evaluation_rubric = self._create_evaluation_rubric() |
|
|
|
def _create_evaluation_rubric(self) -> Dict[str, Dict[str, float]]: |
|
"""Create evaluation rubric with weighted criteria.""" |
|
return { |
|
"accuracy": {"weight": 0.20, "score": 0.0}, |
|
"completeness": {"weight": 0.15, "score": 0.0}, |
|
"logic": {"weight": 0.15, "score": 0.0}, |
|
"evidence": {"weight": 0.15, "score": 0.0}, |
|
"clarity": {"weight": 0.10, "score": 0.0}, |
|
"relevance": {"weight": 0.10, "score": 0.0}, |
|
"consistency": {"weight": 0.10, "score": 0.0}, |
|
"objectivity": {"weight": 0.05, "score": 0.0} |
|
} |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute critical evaluation task.""" |
|
|
|
evaluation_context = self._gather_evaluation_context(task) |
|
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Critic Agent specializing in rigorous evaluation and quality assurance. |
|
Your role is to: |
|
1. Critically evaluate arguments and conclusions |
|
2. Identify weaknesses, gaps, and potential biases |
|
3. Verify logical consistency and evidence quality |
|
4. Suggest improvements and alternative perspectives |
|
5. Ensure high standards of analysis |
|
6. Apply systematic evaluation criteria |
|
7. Provide constructive feedback"""), |
|
HumanMessage(content=f"Critically evaluate the following: {task.description}\n\nContent to evaluate: {evaluation_context}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
critique_result = { |
|
"evaluation": response.content, |
|
"strengths": self._identify_strengths(response.content), |
|
"weaknesses": self._identify_weaknesses(response.content), |
|
"gaps": self._identify_gaps(response.content), |
|
"improvements": self._suggest_improvements(response.content), |
|
"quality_score": self._calculate_quality_score(response.content), |
|
"alternative_perspectives": self._identify_alternatives(response.content), |
|
"final_verdict": self._generate_verdict(response.content) |
|
} |
|
|
|
|
|
self.memory.update_context('evaluation_history', critique_result) |
|
|
|
return critique_result |
|
|
|
def _gather_evaluation_context(self, task: Task) -> str: |
|
"""Gather relevant context for evaluation.""" |
|
context_items = [] |
|
|
|
|
|
recent_messages = self.memory.get_recent_messages(10) |
|
for msg in recent_messages: |
|
if msg.message_type in [MessageType.COMPLETION_REPORT, MessageType.INFORMATION_SHARING]: |
|
context_items.append(f"{msg.sender}: {msg.content[:300]}...") |
|
|
|
|
|
for key, knowledge in self.memory.knowledge_base.items(): |
|
if knowledge['type'] in ['finding', 'conclusion', 'insight']: |
|
context_items.append(f"Previous {knowledge['type']}: {knowledge['content'][:200]}...") |
|
|
|
return "\n\n".join(context_items[:5]) |
|
|
|
def _identify_strengths(self, content: str) -> List[Dict[str, str]]: |
|
"""Identify strengths in the evaluated content.""" |
|
strengths = [] |
|
strength_indicators = ['strong', 'excellent', 'well', 'good', 'effective', |
|
'solid', 'robust', 'comprehensive', 'thorough'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
for indicator in strength_indicators: |
|
if indicator in sentence_lower: |
|
strengths.append({ |
|
"strength": sentence.strip() + '.', |
|
"category": self._categorize_strength(sentence), |
|
"impact": "high" if any(word in sentence_lower for word in ['very', 'extremely', 'highly']) else "medium" |
|
}) |
|
break |
|
|
|
return strengths[:6] |
|
|
|
def _categorize_strength(self, sentence: str) -> str: |
|
"""Categorize the type of strength identified.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['method', 'approach', 'framework']): |
|
return "methodology" |
|
elif any(word in sentence_lower for word in ['data', 'evidence', 'support']): |
|
return "evidence" |
|
elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']): |
|
return "reasoning" |
|
elif any(word in sentence_lower for word in ['clear', 'organized', 'structured']): |
|
return "presentation" |
|
else: |
|
return "general" |
|
|
|
def _identify_weaknesses(self, content: str) -> List[Dict[str, str]]: |
|
"""Identify weaknesses in the evaluated content.""" |
|
weaknesses = [] |
|
weakness_indicators = ['weak', 'lack', 'insufficient', 'poor', 'inadequate', |
|
'missing', 'limited', 'unclear', 'vague'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
for indicator in weakness_indicators: |
|
if indicator in sentence_lower: |
|
weaknesses.append({ |
|
"weakness": sentence.strip() + '.', |
|
"severity": self._assess_severity(sentence), |
|
"category": self._categorize_weakness(sentence) |
|
}) |
|
break |
|
|
|
return weaknesses[:6] |
|
|
|
def _assess_severity(self, sentence: str) -> str: |
|
"""Assess the severity of a weakness.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['critical', 'severe', 'major', 'significant']): |
|
return "high" |
|
elif any(word in sentence_lower for word in ['moderate', 'some', 'partial']): |
|
return "medium" |
|
else: |
|
return "low" |
|
|
|
def _categorize_weakness(self, sentence: str) -> str: |
|
"""Categorize the type of weakness identified.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['data', 'evidence', 'support']): |
|
return "evidence" |
|
elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']): |
|
return "reasoning" |
|
elif any(word in sentence_lower for word in ['bias', 'objective', 'neutral']): |
|
return "objectivity" |
|
elif any(word in sentence_lower for word in ['complete', 'comprehensive', 'thorough']): |
|
return "completeness" |
|
else: |
|
return "general" |
|
|
|
def _identify_gaps(self, content: str) -> List[str]: |
|
"""Identify gaps in the analysis.""" |
|
gaps = [] |
|
gap_indicators = ['gap', 'missing', 'overlook', 'fail to', 'does not address', |
|
'ignores', 'omits', 'neglects'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in gap_indicators): |
|
gaps.append(sentence.strip() + '.') |
|
|
|
return gaps[:5] |
|
|
|
def _suggest_improvements(self, content: str) -> List[Dict[str, str]]: |
|
"""Suggest improvements based on critique.""" |
|
improvements = [] |
|
improvement_indicators = ['could', 'should', 'improve', 'enhance', |
|
'strengthen', 'add', 'consider', 'recommend'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in improvement_indicators): |
|
improvements.append({ |
|
"suggestion": sentence.strip() + '.', |
|
"priority": self._prioritize_improvement(sentence), |
|
"effort": self._estimate_effort(sentence) |
|
}) |
|
|
|
return improvements[:7] |
|
|
|
def _prioritize_improvement(self, sentence: str) -> str: |
|
"""Prioritize improvement suggestions.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['critical', 'essential', 'must', 'urgent']): |
|
return "high" |
|
elif any(word in sentence_lower for word in ['should', 'important', 'recommend']): |
|
return "medium" |
|
else: |
|
return "low" |
|
|
|
def _estimate_effort(self, sentence: str) -> str: |
|
"""Estimate effort required for improvement.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['simple', 'easy', 'quick', 'minor']): |
|
return "low" |
|
elif any(word in sentence_lower for word in ['moderate', 'some', 'reasonable']): |
|
return "medium" |
|
elif any(word in sentence_lower for word in ['significant', 'substantial', 'major']): |
|
return "high" |
|
else: |
|
return "medium" |
|
|
|
def _calculate_quality_score(self, content: str) -> Dict[str, float]: |
|
"""Calculate detailed quality scores.""" |
|
scores = self.evaluation_rubric.copy() |
|
content_lower = content.lower() |
|
|
|
|
|
for criterion in self.evaluation_criteria: |
|
score = 0.5 |
|
|
|
|
|
if criterion in content_lower and any(word in content_lower for word in ['good', 'strong', 'excellent']): |
|
score += 0.3 |
|
|
|
|
|
if criterion in content_lower and any(word in content_lower for word in ['poor', 'weak', 'lacking']): |
|
score -= 0.3 |
|
|
|
scores[criterion]["score"] = max(0.0, min(1.0, score)) |
|
|
|
|
|
overall = sum(scores[c]["score"] * scores[c]["weight"] for c in scores) |
|
|
|
return { |
|
"overall": overall, |
|
"breakdown": {c: scores[c]["score"] for c in scores}, |
|
"grade": self._convert_to_grade(overall) |
|
} |
|
|
|
def _convert_to_grade(self, score: float) -> str: |
|
"""Convert numeric score to letter grade.""" |
|
if score >= 0.9: |
|
return "A" |
|
elif score >= 0.8: |
|
return "B" |
|
elif score >= 0.7: |
|
return "C" |
|
elif score >= 0.6: |
|
return "D" |
|
else: |
|
return "F" |
|
|
|
def _identify_alternatives(self, content: str) -> List[str]: |
|
"""Identify alternative perspectives mentioned.""" |
|
alternatives = [] |
|
alternative_indicators = ['alternatively', 'another perspective', 'different approach', |
|
'could also', 'different view', 'alternative'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in alternative_indicators): |
|
alternatives.append(sentence.strip() + '.') |
|
|
|
return alternatives[:4] |
|
|
|
def _generate_verdict(self, content: str) -> Dict[str, str]: |
|
"""Generate final verdict based on evaluation.""" |
|
|
|
positive_count = sum(content.lower().count(word) for word in ['good', 'strong', 'excellent', 'effective']) |
|
negative_count = sum(content.lower().count(word) for word in ['poor', 'weak', 'lacking', 'insufficient']) |
|
|
|
if positive_count > negative_count * 2: |
|
verdict = "Approved with minor revisions" |
|
confidence = "high" |
|
elif positive_count > negative_count: |
|
verdict = "Approved with moderate revisions" |
|
confidence = "medium" |
|
else: |
|
verdict = "Requires significant improvements" |
|
confidence = "medium" |
|
|
|
return { |
|
"verdict": verdict, |
|
"confidence": confidence, |
|
"summary": "Based on comprehensive evaluation across multiple criteria." |
|
} |
|
|
|
class SynthesizerAgent(BaseAgent): |
|
"""Agent specialized in synthesizing information and creating coherent narratives.""" |
|
|
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.SYNTHESIZER, llm) |
|
self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate", "harmonize"] |
|
self.output_formats = ["executive_summary", "detailed_report", "action_plan", "strategic_recommendation"] |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute synthesis task.""" |
|
|
|
synthesis_input = self._gather_synthesis_input(task) |
|
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Synthesizer Agent specializing in integrating diverse information. |
|
Your role is to: |
|
1. Combine multiple perspectives into coherent narratives |
|
2. Resolve contradictions and find common ground |
|
3. Create comprehensive summaries that capture key insights |
|
4. Generate actionable conclusions and recommendations |
|
5. Ensure clarity and accessibility of complex information |
|
6. Prioritize information based on relevance and impact |
|
7. Create structured outputs suitable for decision-making"""), |
|
HumanMessage(content=f"Synthesize the following information: {task.description}\n\nInput data: {synthesis_input}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
synthesis_result = { |
|
"synthesis": response.content, |
|
"key_themes": self._extract_themes(response.content), |
|
"consensus_points": self._identify_consensus(response.content), |
|
"contradictions": self._identify_contradictions(response.content), |
|
"final_recommendations": self._generate_final_recommendations(response.content), |
|
"executive_summary": self._create_executive_summary(response.content), |
|
"action_items": self._extract_action_items(response.content), |
|
"confidence_level": self._assess_synthesis_confidence(response.content) |
|
} |
|
|
|
|
|
self.memory.update_context('latest_synthesis', synthesis_result) |
|
|
|
return synthesis_result |
|
|
|
def _gather_synthesis_input(self, task: Task) -> str: |
|
"""Gather all relevant information for synthesis.""" |
|
input_sections = [] |
|
|
|
|
|
agent_findings = {} |
|
for msg in self.memory.get_recent_messages(20): |
|
if msg.sender not in agent_findings: |
|
agent_findings[msg.sender] = [] |
|
agent_findings[msg.sender].append(msg.content[:500]) |
|
|
|
|
|
for agent, findings in agent_findings.items(): |
|
if findings: |
|
input_sections.append(f"\n{agent} Contributions:\n" + "\n".join(findings[:3])) |
|
|
|
|
|
knowledge_items = [] |
|
for key, knowledge in self.memory.knowledge_base.items(): |
|
knowledge_items.append(f"{knowledge['type'].title()}: {knowledge['content'][:200]}...") |
|
|
|
if knowledge_items: |
|
input_sections.append("\nKnowledge Base:\n" + "\n".join(knowledge_items[:5])) |
|
|
|
return "\n".join(input_sections) |
|
|
|
def _extract_themes(self, content: str) -> List[Dict[str, Any]]: |
|
"""Extract major themes from synthesis.""" |
|
themes = [] |
|
theme_indicators = ['theme', 'pattern', 'trend', 'common', 'recurring', |
|
'central', 'key finding', 'main point'] |
|
|
|
|
|
paragraphs = content.split('\n\n') |
|
theme_count = 0 |
|
|
|
for paragraph in paragraphs: |
|
paragraph_lower = paragraph.lower() |
|
if any(indicator in paragraph_lower for indicator in theme_indicators): |
|
theme_count += 1 |
|
themes.append({ |
|
"theme": f"Theme {theme_count}", |
|
"description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(), |
|
"importance": self._assess_theme_importance(paragraph), |
|
"support_level": self._assess_support_level(paragraph) |
|
}) |
|
|
|
|
|
if not themes and paragraphs: |
|
for i, paragraph in enumerate(paragraphs[:5]): |
|
if len(paragraph.strip()) > 50: |
|
themes.append({ |
|
"theme": f"Finding {i+1}", |
|
"description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(), |
|
"importance": "medium", |
|
"support_level": "moderate" |
|
}) |
|
|
|
return themes[:6] |
|
|
|
def _assess_theme_importance(self, content: str) -> str: |
|
"""Assess the importance of a theme.""" |
|
content_lower = content.lower() |
|
|
|
high_importance_indicators = ['critical', 'essential', 'fundamental', 'crucial', 'vital'] |
|
if any(indicator in content_lower for indicator in high_importance_indicators): |
|
return "high" |
|
|
|
low_importance_indicators = ['minor', 'secondary', 'marginal', 'peripheral'] |
|
if any(indicator in content_lower for indicator in low_importance_indicators): |
|
return "low" |
|
|
|
return "medium" |
|
|
|
def _assess_support_level(self, content: str) -> str: |
|
"""Assess the level of support for a theme.""" |
|
content_lower = content.lower() |
|
|
|
strong_support = ['consensus', 'unanimous', 'clear evidence', 'strongly supported'] |
|
if any(indicator in content_lower for indicator in strong_support): |
|
return "strong" |
|
|
|
weak_support = ['limited evidence', 'some indication', 'preliminary', 'tentative'] |
|
if any(indicator in content_lower for indicator in weak_support): |
|
return "weak" |
|
|
|
return "moderate" |
|
|
|
def _identify_consensus(self, content: str) -> List[Dict[str, str]]: |
|
"""Identify points of consensus.""" |
|
consensus_points = [] |
|
consensus_indicators = ['agree', 'consensus', 'common', 'shared', 'unanimous', |
|
'consistent', 'alignment', 'convergence'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in consensus_indicators): |
|
consensus_points.append({ |
|
"point": sentence.strip() + '.', |
|
"strength": "strong" if "unanimous" in sentence_lower or "clear consensus" in sentence_lower else "moderate" |
|
}) |
|
|
|
return consensus_points[:6] |
|
|
|
def _identify_contradictions(self, content: str) -> List[Dict[str, str]]: |
|
"""Identify contradictions or conflicts.""" |
|
contradictions = [] |
|
conflict_indicators = ['however', 'contrary', 'conflict', 'disagree', 'opposing', |
|
'contradicts', 'tension', 'divergent', 'inconsistent'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in conflict_indicators): |
|
contradictions.append({ |
|
"contradiction": sentence.strip() + '.', |
|
"resolution_suggested": self._check_for_resolution(sentence), |
|
"impact": self._assess_contradiction_impact(sentence) |
|
}) |
|
|
|
return contradictions[:4] |
|
|
|
def _check_for_resolution(self, sentence: str) -> bool: |
|
"""Check if a resolution is suggested for the contradiction.""" |
|
resolution_indicators = ['can be resolved', 'reconcile', 'bridge', 'common ground', 'compromise'] |
|
return any(indicator in sentence.lower() for indicator in resolution_indicators) |
|
|
|
def _assess_contradiction_impact(self, sentence: str) -> str: |
|
"""Assess the impact of a contradiction.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['fundamental', 'major', 'significant']): |
|
return "high" |
|
elif any(word in sentence_lower for word in ['minor', 'small', 'slight']): |
|
return "low" |
|
else: |
|
return "medium" |
|
|
|
def _generate_final_recommendations(self, content: str) -> List[Dict[str, Any]]: |
|
"""Generate final synthesized recommendations.""" |
|
recommendations = [] |
|
|
|
|
|
rec_indicators = ['recommend', 'suggest', 'propose', 'advise', 'should', 'must'] |
|
sentences = content.split('.') |
|
|
|
for sentence in sentences: |
|
sentence_lower = sentence.lower() |
|
if any(indicator in sentence_lower for indicator in rec_indicators): |
|
recommendations.append({ |
|
"recommendation": sentence.strip() + '.', |
|
"priority": self._determine_priority(sentence), |
|
"timeframe": self._determine_timeframe(sentence), |
|
"category": self._categorize_recommendation(sentence) |
|
}) |
|
|
|
|
|
priority_order = {"high": 0, "medium": 1, "low": 2} |
|
recommendations.sort(key=lambda x: priority_order.get(x["priority"], 3)) |
|
|
|
return recommendations[:8] |
|
|
|
def _determine_priority(self, sentence: str) -> str: |
|
"""Determine recommendation priority.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['urgent', 'immediate', 'critical', 'must']): |
|
return "high" |
|
elif any(word in sentence_lower for word in ['should', 'important', 'recommend']): |
|
return "medium" |
|
else: |
|
return "low" |
|
|
|
def _determine_timeframe(self, sentence: str) -> str: |
|
"""Determine recommendation timeframe.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['immediate', 'now', 'urgent', 'asap']): |
|
return "immediate" |
|
elif any(word in sentence_lower for word in ['short-term', 'soon', 'near']): |
|
return "short-term" |
|
elif any(word in sentence_lower for word in ['long-term', 'future', 'eventually']): |
|
return "long-term" |
|
else: |
|
return "medium-term" |
|
|
|
def _categorize_recommendation(self, sentence: str) -> str: |
|
"""Categorize the type of recommendation.""" |
|
sentence_lower = sentence.lower() |
|
|
|
if any(word in sentence_lower for word in ['strategy', 'strategic', 'plan']): |
|
return "strategic" |
|
elif any(word in sentence_lower for word in ['operational', 'process', 'procedure']): |
|
return "operational" |
|
elif any(word in sentence_lower for word in ['tactical', 'action', 'implement']): |
|
return "tactical" |
|
else: |
|
return "general" |
|
|
|
def _create_executive_summary(self, content: str) -> str: |
|
"""Create an executive summary of the synthesis.""" |
|
|
|
summary_parts = [] |
|
|
|
|
|
paragraphs = content.split('\n\n') |
|
if paragraphs: |
|
opening = paragraphs[0][:200] |
|
if len(paragraphs[0]) > 200: |
|
opening += "..." |
|
summary_parts.append(opening) |
|
|
|
|
|
key_finding_indicators = ['key finding', 'main conclusion', 'importantly', 'notably'] |
|
for paragraph in paragraphs[1:]: |
|
if any(indicator in paragraph.lower() for indicator in key_finding_indicators): |
|
summary_parts.append(paragraph[:150] + "..." if len(paragraph) > 150 else paragraph) |
|
if len(summary_parts) >= 3: |
|
break |
|
|
|
|
|
if len(paragraphs) > 1: |
|
conclusion = paragraphs[-1][:150] |
|
if conclusion not in summary_parts: |
|
summary_parts.append(conclusion + "..." if len(paragraphs[-1]) > 150 else conclusion) |
|
|
|
return " ".join(summary_parts) |
|
|
|
def _extract_action_items(self, content: str) -> List[Dict[str, str]]: |
|
"""Extract specific action items from synthesis.""" |
|
action_items = [] |
|
action_indicators = ['action:', 'task:', 'todo:', 'action item', 'next step', 'to do'] |
|
|
|
lines = content.split('\n') |
|
for line in lines: |
|
line_lower = line.lower() |
|
if any(indicator in line_lower for indicator in action_indicators): |
|
action_items.append({ |
|
"action": line.strip(), |
|
"owner": "TBD", |
|
"deadline": "TBD", |
|
"status": "pending" |
|
}) |
|
|
|
elif line.strip() and line.strip()[0].isdigit() and 'action' in line_lower: |
|
action_items.append({ |
|
"action": line.strip(), |
|
"owner": "TBD", |
|
"deadline": "TBD", |
|
"status": "pending" |
|
}) |
|
|
|
return action_items[:10] |
|
|
|
def _assess_synthesis_confidence(self, content: str) -> Dict[str, Any]: |
|
"""Assess confidence in the synthesis.""" |
|
|
|
word_count = len(content.split()) |
|
|
|
|
|
high_confidence_words = ['clear', 'strong', 'definitive', 'conclusive', 'certain'] |
|
low_confidence_words = ['uncertain', 'unclear', 'tentative', 'preliminary', 'limited'] |
|
|
|
high_conf_count = sum(content.lower().count(word) for word in high_confidence_words) |
|
low_conf_count = sum(content.lower().count(word) for word in low_confidence_words) |
|
|
|
|
|
base_confidence = 0.7 |
|
confidence_adjustment = (high_conf_count * 0.05) - (low_conf_count * 0.08) |
|
overall_confidence = max(0.3, min(0.95, base_confidence + confidence_adjustment)) |
|
|
|
return { |
|
"overall": overall_confidence, |
|
"level": "high" if overall_confidence > 0.8 else "medium" if overall_confidence > 0.6 else "low", |
|
"factors": { |
|
"content_depth": word_count > 500, |
|
"evidence_strength": high_conf_count > low_conf_count, |
|
"consensus_level": "consensus" in content.lower() |
|
} |
|
} |
|
|
|
class CoordinatorAgent(BaseAgent): |
|
"""Agent responsible for coordinating other agents and managing workflow.""" |
|
|
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.COORDINATOR, llm) |
|
self.agents: Dict[str, BaseAgent] = {} |
|
self.task_queue: List[Task] = [] |
|
self.completed_tasks: List[Task] = [] |
|
self.workflow_graph = nx.DiGraph() |
|
self.execution_history: List[Dict[str, Any]] = [] |
|
self.workflow_templates = self._create_workflow_templates() |
|
self.collaboration_network = nx.Graph() |
|
|
|
def _create_workflow_templates(self) -> Dict[str, List[Dict[str, Any]]]: |
|
"""Create predefined workflow templates for common problem types.""" |
|
return { |
|
"research_analysis": [ |
|
{"role": "Researcher", "task": "Gather comprehensive information"}, |
|
{"role": "Analyst", "task": "Analyze findings and identify patterns"}, |
|
{"role": "Critic", "task": "Evaluate analysis quality"}, |
|
{"role": "Synthesizer", "task": "Create final recommendations"} |
|
], |
|
"strategic_planning": [ |
|
{"role": "Researcher", "task": "Research current state and trends"}, |
|
{"role": "Analyst", "task": "SWOT analysis and opportunity identification"}, |
|
{"role": "Researcher", "task": "Benchmark best practices"}, |
|
{"role": "Critic", "task": "Risk assessment and gap analysis"}, |
|
{"role": "Synthesizer", "task": "Strategic plan synthesis"} |
|
], |
|
"problem_solving": [ |
|
{"role": "Researcher", "task": "Define problem and gather context"}, |
|
{"role": "Analyst", "task": "Root cause analysis"}, |
|
{"role": "Researcher", "task": "Research potential solutions"}, |
|
{"role": "Critic", "task": "Evaluate solution feasibility"}, |
|
{"role": "Synthesizer", "task": "Recommend optimal solution"} |
|
] |
|
} |
|
|
|
def register_agent(self, agent: BaseAgent): |
|
"""Register an agent with the coordinator.""" |
|
self.agents[agent.name] = agent |
|
self.workflow_graph.add_node(agent.name, role=agent.role.value) |
|
self.collaboration_network.add_node(agent.name, role=agent.role.value) |
|
logger.info(f"Registered agent: {agent.name} with role {agent.role.value}") |
|
|
|
async def decompose_problem(self, problem: str, use_template: bool = False) -> List[Task]: |
|
"""Decompose a complex problem into subtasks.""" |
|
if use_template: |
|
|
|
template_tasks = self._match_problem_to_template(problem) |
|
if template_tasks: |
|
return template_tasks |
|
|
|
if self.llm: |
|
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems. |
|
Decompose the problem into specific subtasks that can be assigned to specialized agents: |
|
- Researcher: For gathering information and facts |
|
- Analyst: For analyzing data and identifying patterns |
|
- Critic: For evaluating quality and identifying issues |
|
- Synthesizer: For combining insights and creating summaries |
|
|
|
Create 4-8 clear, actionable subtasks with dependencies. |
|
Format each task as: [Role]: [Specific task description]"""), |
|
HumanMessage(content=f"Decompose this problem into subtasks: {problem}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
tasks = self._parse_tasks(response.content, problem) |
|
else: |
|
|
|
tasks = self._create_default_tasks(problem) |
|
|
|
|
|
for i, task in enumerate(tasks): |
|
task.metadata['problem_complexity'] = self._assess_problem_complexity(problem) |
|
task.metadata['estimated_duration'] = self._estimate_task_duration(task) |
|
|
|
return tasks |
|
|
|
def _match_problem_to_template(self, problem: str) -> Optional[List[Task]]: |
|
"""Match problem to a workflow template.""" |
|
problem_lower = problem.lower() |
|
|
|
|
|
if any(word in problem_lower for word in ['strategy', 'strategic', 'plan']): |
|
template_name = "strategic_planning" |
|
elif any(word in problem_lower for word in ['research', 'analyze', 'investigate']): |
|
template_name = "research_analysis" |
|
elif any(word in problem_lower for word in ['problem', 'solve', 'solution', 'fix']): |
|
template_name = "problem_solving" |
|
else: |
|
return None |
|
|
|
|
|
template = self.workflow_templates[template_name] |
|
tasks = [] |
|
|
|
for i, step in enumerate(template): |
|
task = Task( |
|
id=f"task_{i+1}", |
|
description=f"{step['task']} for: {problem}", |
|
metadata={ |
|
"original_problem": problem, |
|
"suggested_role": step['role'], |
|
"template": template_name |
|
} |
|
) |
|
tasks.append(task) |
|
|
|
return tasks |
|
|
|
def _assess_problem_complexity(self, problem: str) -> str: |
|
"""Assess the complexity of a problem.""" |
|
|
|
complexity_indicators = { |
|
"high": ['multiple', 'complex', 'comprehensive', 'strategic', 'long-term'], |
|
"medium": ['analyze', 'evaluate', 'develop', 'assess'], |
|
"low": ['simple', 'basic', 'straightforward', 'identify'] |
|
} |
|
|
|
problem_lower = problem.lower() |
|
|
|
for level, indicators in complexity_indicators.items(): |
|
if any(indicator in problem_lower for indicator in indicators): |
|
return level |
|
|
|
|
|
return "high" if len(problem) > 200 else "medium" |
|
|
|
def _estimate_task_duration(self, task: Task) -> float: |
|
"""Estimate task duration in seconds.""" |
|
|
|
base_duration = 30.0 |
|
|
|
|
|
role_multipliers = { |
|
"Researcher": 1.2, |
|
"Analyst": 1.5, |
|
"Critic": 1.0, |
|
"Synthesizer": 1.3 |
|
} |
|
|
|
role = task.metadata.get("suggested_role", "Researcher") |
|
duration = base_duration * role_multipliers.get(role, 1.0) |
|
|
|
|
|
complexity = task.metadata.get("problem_complexity", "medium") |
|
if complexity == "high": |
|
duration *= 1.5 |
|
elif complexity == "low": |
|
duration *= 0.7 |
|
|
|
return duration |
|
|
|
def _parse_tasks(self, content: str, original_problem: str) -> List[Task]: |
|
"""Parse LLM response into Task objects.""" |
|
tasks = [] |
|
lines = content.split('\n') |
|
|
|
task_id = 1 |
|
current_role = None |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if not line: |
|
continue |
|
|
|
|
|
role_found = False |
|
for role in AgentRole: |
|
if role.value in line or role.value.lower() in line.lower(): |
|
current_role = role.value |
|
role_found = True |
|
break |
|
|
|
|
|
if current_role and ':' in line: |
|
|
|
task_parts = line.split(':', 1) |
|
if len(task_parts) > 1: |
|
task_desc = task_parts[1].strip() |
|
|
|
task = Task( |
|
id=f"task_{task_id}", |
|
description=task_desc, |
|
metadata={ |
|
"original_problem": original_problem, |
|
"suggested_role": current_role, |
|
"source": "llm_decomposition" |
|
} |
|
) |
|
|
|
tasks.append(task) |
|
task_id += 1 |
|
|
|
|
|
if len(tasks) < 4: |
|
tasks.extend(self._create_default_tasks(original_problem)[len(tasks):]) |
|
|
|
return tasks |
|
|
|
def _create_default_tasks(self, problem: str) -> List[Task]: |
|
"""Create default tasks for a problem.""" |
|
return [ |
|
Task( |
|
id="task_1", |
|
description=f"Research comprehensive background information on: {problem}", |
|
metadata={"suggested_role": "Researcher", "source": "default"} |
|
), |
|
Task( |
|
id="task_2", |
|
description=f"Analyze key factors and patterns related to: {problem}", |
|
metadata={"suggested_role": "Analyst", "source": "default"} |
|
), |
|
Task( |
|
id="task_3", |
|
description="Critically evaluate the research findings and analysis quality", |
|
metadata={"suggested_role": "Critic", "source": "default"} |
|
), |
|
Task( |
|
id="task_4", |
|
description="Synthesize all findings into actionable insights and recommendations", |
|
metadata={"suggested_role": "Synthesizer", "source": "default"} |
|
) |
|
] |
|
|
|
def _build_dependency_graph(self, tasks: List[Task]): |
|
"""Build a dependency graph for tasks.""" |
|
|
|
role_order = { |
|
"Researcher": 1, |
|
"Analyst": 2, |
|
"Critic": 3, |
|
"Synthesizer": 4 |
|
} |
|
|
|
|
|
sorted_tasks = sorted(tasks, |
|
key=lambda t: role_order.get(t.metadata.get("suggested_role", "Researcher"), 5)) |
|
|
|
|
|
for i in range(len(sorted_tasks) - 1): |
|
current_task = sorted_tasks[i] |
|
next_task = sorted_tasks[i + 1] |
|
|
|
|
|
current_order = role_order.get(current_task.metadata.get("suggested_role"), 0) |
|
next_order = role_order.get(next_task.metadata.get("suggested_role"), 0) |
|
|
|
if next_order >= current_order: |
|
next_task.dependencies.append(current_task.id) |
|
|
|
async def execute_workflow(self, tasks: List[Task], parallel: bool = True) -> Dict[str, Any]: |
|
"""Execute the workflow with given tasks.""" |
|
start_time = datetime.now() |
|
self.performance_tracker.start_tracking() |
|
|
|
|
|
self._build_dependency_graph(tasks) |
|
|
|
|
|
self._update_workflow_graph(tasks) |
|
|
|
|
|
try: |
|
if parallel: |
|
await self._execute_parallel(tasks) |
|
else: |
|
await self._execute_sequential(tasks) |
|
except Exception as e: |
|
logger.error(f"Workflow execution error: {str(e)}") |
|
|
|
|
|
end_time = datetime.now() |
|
self.performance_tracker.end_tracking() |
|
|
|
execution_time = (end_time - start_time).total_seconds() |
|
|
|
|
|
self._update_collaboration_network() |
|
|
|
workflow_result = { |
|
"tasks": tasks, |
|
"execution_time": execution_time, |
|
"success_rate": self._calculate_success_rate(tasks), |
|
"agent_contributions": self._compile_agent_contributions(tasks), |
|
"workflow_graph": self.collaboration_network, |
|
"performance_metrics": self.performance_tracker.get_performance_summary(), |
|
"timestamp": datetime.now() |
|
} |
|
|
|
self.execution_history.append(workflow_result) |
|
|
|
return workflow_result |
|
|
|
def _update_workflow_graph(self, tasks: List[Task]): |
|
"""Update the workflow graph with task relationships.""" |
|
|
|
for task in tasks: |
|
self.workflow_graph.add_node( |
|
task.id, |
|
task_description=task.description[:50] + "...", |
|
status=task.status.value |
|
) |
|
|
|
|
|
for task in tasks: |
|
for dep_id in task.dependencies: |
|
self.workflow_graph.add_edge(dep_id, task.id) |
|
|
|
def _update_collaboration_network(self): |
|
"""Update the collaboration network based on agent interactions.""" |
|
|
|
agent_names = list(self.agents.keys()) |
|
|
|
|
|
if len(agent_names) >= 4: |
|
|
|
if "Researcher-1" in agent_names and "Analyst-1" in agent_names: |
|
self.collaboration_network.add_edge("Researcher-1", "Analyst-1", weight=3) |
|
|
|
|
|
if "Analyst-1" in agent_names and "Critic-1" in agent_names: |
|
self.collaboration_network.add_edge("Analyst-1", "Critic-1", weight=2) |
|
|
|
|
|
if "Critic-1" in agent_names and "Synthesizer-1" in agent_names: |
|
self.collaboration_network.add_edge("Critic-1", "Synthesizer-1", weight=2) |
|
|
|
|
|
if "Researcher-1" in agent_names and "Synthesizer-1" in agent_names: |
|
self.collaboration_network.add_edge("Researcher-1", "Synthesizer-1", weight=1) |
|
|
|
async def _execute_parallel(self, tasks: List[Task]) -> List[Task]: |
|
"""Execute tasks in parallel where possible.""" |
|
completed = set() |
|
pending = tasks.copy() |
|
|
|
with ThreadPoolExecutor(max_workers=len(self.agents)) as executor: |
|
while pending: |
|
|
|
ready_tasks = [ |
|
task for task in pending |
|
if all(dep in completed for dep in task.dependencies) |
|
] |
|
|
|
if not ready_tasks: |
|
|
|
logger.warning("No ready tasks found, executing first pending task") |
|
ready_tasks = [pending[0]] if pending else [] |
|
|
|
if not ready_tasks: |
|
break |
|
|
|
|
|
future_to_task = {} |
|
for task in ready_tasks: |
|
agent_name = self._select_agent_for_task(task) |
|
if agent_name and agent_name in self.agents: |
|
agent = self.agents[agent_name] |
|
future = executor.submit(asyncio.run, agent.process_task(task)) |
|
future_to_task[future] = (task, agent_name) |
|
|
|
|
|
for future in as_completed(future_to_task): |
|
task, agent_name = future_to_task[future] |
|
try: |
|
completed_task = future.result() |
|
completed.add(task.id) |
|
pending.remove(task) |
|
self.completed_tasks.append(completed_task) |
|
|
|
|
|
self.workflow_graph.add_edge( |
|
self.name, agent_name, |
|
task_id=task.id, |
|
timestamp=datetime.now().isoformat() |
|
) |
|
|
|
|
|
await self._facilitate_collaboration(completed_task, agent_name) |
|
|
|
except Exception as e: |
|
logger.error(f"Task {task.id} failed: {str(e)}") |
|
task.status = TaskStatus.FAILED |
|
|
|
return tasks |
|
|
|
async def _execute_sequential(self, tasks: List[Task]) -> List[Task]: |
|
"""Execute tasks sequentially.""" |
|
for task in tasks: |
|
agent_name = self._select_agent_for_task(task) |
|
if agent_name and agent_name in self.agents: |
|
agent = self.agents[agent_name] |
|
completed_task = await agent.process_task(task) |
|
|
|
|
|
self.workflow_graph.add_edge( |
|
self.name, agent_name, |
|
task_id=task.id, |
|
timestamp=datetime.now().isoformat() |
|
) |
|
|
|
self.completed_tasks.append(completed_task) |
|
|
|
|
|
await self._facilitate_collaboration(completed_task, agent_name) |
|
|
|
return tasks |
|
|
|
def _select_agent_for_task(self, task: Task) -> Optional[str]: |
|
"""Select the best agent for a given task.""" |
|
suggested_role = task.metadata.get("suggested_role") |
|
|
|
|
|
for agent_name, agent in self.agents.items(): |
|
if agent.role.value == suggested_role: |
|
|
|
if agent.active and agent.current_task is None: |
|
return agent_name |
|
|
|
|
|
for agent_name, agent in self.agents.items(): |
|
if agent.role.value == suggested_role: |
|
return agent_name |
|
|
|
|
|
for agent_name, agent in self.agents.items(): |
|
if agent.active: |
|
return agent_name |
|
|
|
return None |
|
|
|
async def _facilitate_collaboration(self, task: Task, agent_name: str): |
|
"""Facilitate collaboration between agents after task completion.""" |
|
if not task.result or task.status != TaskStatus.COMPLETED: |
|
return |
|
|
|
|
|
collab_message = AgentMessage( |
|
sender=agent_name, |
|
recipient="all", |
|
content=f"Task completed: {task.description}\nKey findings: {str(task.result)[:500]}", |
|
message_type=MessageType.COMPLETION_REPORT, |
|
priority=3 |
|
) |
|
|
|
|
|
shared_count = 0 |
|
for other_agent_name, other_agent in self.agents.items(): |
|
if other_agent_name != agent_name: |
|
|
|
if self._should_collaborate(task, other_agent): |
|
await other_agent.memory.add_message(collab_message) |
|
self.performance_tracker.record_collaboration() |
|
self.performance_tracker.record_message() |
|
shared_count += 1 |
|
|
|
|
|
if shared_count > 0: |
|
logger.info(f"Agent {agent_name} shared findings with {shared_count} other agents") |
|
|
|
def _should_collaborate(self, task: Task, agent: BaseAgent) -> bool: |
|
"""Determine if an agent should receive collaboration message.""" |
|
|
|
if agent.role == AgentRole.SYNTHESIZER: |
|
return True |
|
|
|
|
|
if agent.role == AgentRole.CRITIC and task.metadata.get("suggested_role") in ["Researcher", "Analyst"]: |
|
return True |
|
|
|
|
|
if agent.role == AgentRole.ANALYST and task.metadata.get("suggested_role") == "Researcher": |
|
return True |
|
|
|
return False |
|
|
|
def _calculate_success_rate(self, tasks: List[Task]) -> float: |
|
"""Calculate the success rate of task execution.""" |
|
if not tasks: |
|
return 0.0 |
|
|
|
successful = sum(1 for task in tasks if task.status == TaskStatus.COMPLETED) |
|
return successful / len(tasks) |
|
|
|
def _compile_agent_contributions(self, tasks: List[Task]) -> Dict[str, Any]: |
|
"""Compile contributions from each agent.""" |
|
contributions = {} |
|
|
|
for agent_name, agent in self.agents.items(): |
|
agent_tasks = [task for task in tasks if task.assigned_to == agent_name] |
|
|
|
if agent_tasks: |
|
total_execution_time = sum( |
|
task.performance_metrics.get('execution_time', 0) |
|
for task in agent_tasks |
|
) |
|
|
|
avg_confidence = np.mean([task.confidence for task in agent_tasks]) |
|
|
|
contributions[agent_name] = { |
|
"role": agent.role.value, |
|
"tasks_completed": len(agent_tasks), |
|
"average_confidence": avg_confidence, |
|
"total_execution_time": total_execution_time, |
|
"collaboration_count": len(agent.collaboration_partners), |
|
"status": agent.get_status_summary() |
|
} |
|
else: |
|
contributions[agent_name] = { |
|
"role": agent.role.value, |
|
"tasks_completed": 0, |
|
"average_confidence": 0.0, |
|
"total_execution_time": 0.0, |
|
"collaboration_count": 0, |
|
"status": agent.get_status_summary() |
|
} |
|
|
|
return contributions |
|
|
|
def get_workflow_insights(self) -> Dict[str, Any]: |
|
"""Get insights about workflow execution patterns.""" |
|
if not self.execution_history: |
|
return {"message": "No execution history available"} |
|
|
|
|
|
total_executions = len(self.execution_history) |
|
avg_execution_time = np.mean([wf['execution_time'] for wf in self.execution_history]) |
|
avg_success_rate = np.mean([wf['success_rate'] for wf in self.execution_history]) |
|
|
|
|
|
agent_stats = {} |
|
for workflow in self.execution_history: |
|
for agent, contrib in workflow['agent_contributions'].items(): |
|
if agent not in agent_stats: |
|
agent_stats[agent] = { |
|
'total_tasks': 0, |
|
'total_time': 0, |
|
'confidence_scores': [] |
|
} |
|
|
|
agent_stats[agent]['total_tasks'] += contrib['tasks_completed'] |
|
agent_stats[agent]['total_time'] += contrib['total_execution_time'] |
|
if contrib['average_confidence'] > 0: |
|
agent_stats[agent]['confidence_scores'].append(contrib['average_confidence']) |
|
|
|
|
|
agent_efficiency = {} |
|
for agent, stats in agent_stats.items(): |
|
if stats['total_tasks'] > 0: |
|
agent_efficiency[agent] = { |
|
'avg_time_per_task': stats['total_time'] / stats['total_tasks'], |
|
'avg_confidence': np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0, |
|
'total_tasks': stats['total_tasks'] |
|
} |
|
|
|
return { |
|
'total_workflows_executed': total_executions, |
|
'average_execution_time': avg_execution_time, |
|
'average_success_rate': avg_success_rate, |
|
'agent_efficiency': agent_efficiency, |
|
'most_efficient_agent': min(agent_efficiency.items(), |
|
key=lambda x: x[1]['avg_time_per_task'])[0] if agent_efficiency else None, |
|
'highest_quality_agent': max(agent_efficiency.items(), |
|
key=lambda x: x[1]['avg_confidence'])[0] if agent_efficiency else None |
|
} |
|
|
|
class WorkflowVisualizer: |
|
"""Handles visualization of agent interactions and workflow.""" |
|
|
|
def __init__(self): |
|
self.color_map = Config.NODE_COLORS |
|
self.layout_cache = {} |
|
self.animation_frames = [] |
|
|
|
def create_workflow_graph(self, workflow_graph: nx.Graph, |
|
active_agents: List[str] = None, |
|
highlight_tasks: List[str] = None) -> go.Figure: |
|
"""Create an interactive workflow visualization.""" |
|
|
|
if len(workflow_graph.nodes()) == 0: |
|
return self._create_empty_graph() |
|
|
|
|
|
pos = nx.spring_layout(workflow_graph, k=2, iterations=50) |
|
|
|
|
|
edge_trace = self._create_edge_trace(workflow_graph, pos) |
|
node_trace = self._create_node_trace(workflow_graph, pos, active_agents, highlight_tasks) |
|
|
|
|
|
fig = go.Figure( |
|
data=[edge_trace, node_trace], |
|
layout=go.Layout( |
|
title={ |
|
'text': 'Agent Collaboration Network', |
|
'x': 0.5, |
|
'xanchor': 'center', |
|
'font': {'size': 16, 'color': '#2c3e50'} |
|
}, |
|
showlegend=False, |
|
hovermode='closest', |
|
margin=dict(b=40, l=40, r=40, t=60), |
|
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
plot_bgcolor='#f8f9fa', |
|
paper_bgcolor='white', |
|
font=dict(family='Inter, sans-serif'), |
|
) |
|
) |
|
|
|
return fig |
|
|
|
def _create_empty_graph(self) -> go.Figure: |
|
"""Create empty graph with message.""" |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No workflow data to display.<br>Start an analysis to see agent interactions.", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, |
|
showarrow=False, |
|
font=dict(size=16, color='#7f8c8d'), |
|
bgcolor='#ecf0f1', |
|
borderpad=20 |
|
) |
|
fig.update_layout( |
|
height=500, |
|
plot_bgcolor='#f8f9fa', |
|
paper_bgcolor='white' |
|
) |
|
return fig |
|
|
|
def _create_edge_trace(self, G: nx.Graph, pos: Dict) -> go.Scatter: |
|
"""Create edge trace for the graph.""" |
|
edge_x = [] |
|
edge_y = [] |
|
|
|
for edge in G.edges(): |
|
if edge[0] in pos and edge[1] in pos: |
|
x0, y0 = pos[edge[0]] |
|
x1, y1 = pos[edge[1]] |
|
edge_x.extend([x0, x1, None]) |
|
edge_y.extend([y0, y1, None]) |
|
|
|
edge_trace = go.Scatter( |
|
x=edge_x, y=edge_y, |
|
line=dict(width=2, color='#95a5a6'), |
|
hoverinfo='none', |
|
mode='lines' |
|
) |
|
|
|
return edge_trace |
|
|
|
def _create_node_trace(self, G: nx.Graph, pos: Dict, |
|
active_agents: List[str] = None, |
|
highlight_tasks: List[str] = None) -> go.Scatter: |
|
"""Create node trace for the graph.""" |
|
node_x = [] |
|
node_y = [] |
|
node_colors = [] |
|
node_sizes = [] |
|
node_text = [] |
|
|
|
for node in G.nodes(): |
|
if node in pos: |
|
x, y = pos[node] |
|
node_x.append(x) |
|
node_y.append(y) |
|
|
|
|
|
node_data = G.nodes[node] |
|
role = node_data.get('role', '') |
|
|
|
|
|
color = self.color_map.get(role, '#95a5a6') |
|
size = 40 |
|
|
|
node_colors.append(color) |
|
node_sizes.append(size) |
|
node_text.append(node) |
|
|
|
node_trace = go.Scatter( |
|
x=node_x, y=node_y, |
|
mode='markers+text', |
|
hoverinfo='text', |
|
text=node_text, |
|
hovertext=node_text, |
|
textposition="bottom center", |
|
marker=dict( |
|
showscale=False, |
|
color=node_colors, |
|
size=node_sizes, |
|
line=dict(color='white', width=2) |
|
) |
|
) |
|
|
|
return node_trace |
|
|
|
def create_task_timeline(self, tasks: List[Task]) -> go.Figure: |
|
"""Create a timeline visualization of task execution.""" |
|
|
|
|
|
timeline_data = [] |
|
|
|
for task in tasks: |
|
if task.created_at: |
|
|
|
end_time = task.completed_at if task.completed_at else task.created_at + timedelta(seconds=30) |
|
|
|
timeline_data.append({ |
|
'Task': task.id, |
|
'Agent': task.assigned_to or 'Unassigned', |
|
'Start': task.created_at, |
|
'Finish': end_time, |
|
'Status': task.status.value, |
|
'Confidence': task.confidence |
|
}) |
|
|
|
if not timeline_data: |
|
return self._create_empty_timeline() |
|
|
|
|
|
df = pd.DataFrame(timeline_data) |
|
|
|
|
|
fig = px.timeline( |
|
df, |
|
x_start="Start", |
|
x_end="Finish", |
|
y="Agent", |
|
color="Confidence", |
|
hover_data=["Task", "Status"], |
|
color_continuous_scale="Viridis", |
|
labels={'Confidence': 'Confidence Score'} |
|
) |
|
|
|
|
|
fig.update_layout( |
|
title={ |
|
'text': 'Task Execution Timeline', |
|
'x': 0.5, |
|
'xanchor': 'center', |
|
'font': {'size': 18, 'color': '#2c3e50'} |
|
}, |
|
height=400, |
|
xaxis_title="Time", |
|
yaxis_title="Agent", |
|
plot_bgcolor='#f8f9fa', |
|
paper_bgcolor='white', |
|
font=dict(family='Inter, sans-serif'), |
|
yaxis={'categoryorder': 'total ascending'} |
|
) |
|
|
|
return fig |
|
|
|
def _create_empty_timeline(self) -> go.Figure: |
|
"""Create empty timeline with message.""" |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No task execution data available yet.", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, |
|
showarrow=False, |
|
font=dict(size=14, color='#7f8c8d') |
|
) |
|
fig.update_layout( |
|
height=400, |
|
plot_bgcolor='#f8f9fa', |
|
paper_bgcolor='white' |
|
) |
|
return fig |
|
|
|
def create_confidence_heatmap(self, agent_contributions: Dict[str, Any]) -> go.Figure: |
|
"""Create a heatmap showing agent performance metrics.""" |
|
|
|
if not agent_contributions: |
|
return self._create_empty_heatmap() |
|
|
|
|
|
agents = list(agent_contributions.keys()) |
|
metrics = ['Tasks Completed', 'Avg Confidence', 'Time Efficiency', 'Collaboration Score'] |
|
|
|
|
|
data = [] |
|
for metric in metrics: |
|
row = [] |
|
for agent in agents: |
|
contrib = agent_contributions[agent] |
|
|
|
if metric == 'Tasks Completed': |
|
value = contrib.get('tasks_completed', 0) / 5.0 |
|
elif metric == 'Avg Confidence': |
|
value = contrib.get('average_confidence', 0) |
|
elif metric == 'Time Efficiency': |
|
|
|
time = contrib.get('total_execution_time', 1) |
|
tasks = contrib.get('tasks_completed', 1) |
|
avg_time = time / tasks if tasks > 0 else float('inf') |
|
value = min(1.0, 30.0 / avg_time) if avg_time > 0 else 0 |
|
elif metric == 'Collaboration Score': |
|
value = min(1.0, contrib.get('collaboration_count', 0) / 3.0) |
|
else: |
|
value = 0 |
|
|
|
row.append(value) |
|
data.append(row) |
|
|
|
|
|
fig = go.Figure(data=go.Heatmap( |
|
z=data, |
|
x=agents, |
|
y=metrics, |
|
colorscale='Blues', |
|
text=np.round(data, 2), |
|
texttemplate='%{text}', |
|
textfont={"size": 12}, |
|
colorbar=dict(title={'text': "Score", 'side': 'right'}), |
|
hoverongaps=False |
|
)) |
|
|
|
|
|
fig.update_layout( |
|
title={ |
|
'text': 'Agent Performance Metrics', |
|
'x': 0.5, |
|
'xanchor': 'center', |
|
'font': {'size': 18, 'color': '#2c3e50'} |
|
}, |
|
xaxis_title="Agents", |
|
yaxis_title="Metrics", |
|
height=350, |
|
plot_bgcolor='white', |
|
paper_bgcolor='white', |
|
font=dict(family='Inter, sans-serif') |
|
) |
|
|
|
return fig |
|
|
|
def _create_empty_heatmap(self) -> go.Figure: |
|
"""Create empty heatmap with message.""" |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No agent performance data available yet.", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, |
|
showarrow=False, |
|
font=dict(size=14, color='#7f8c8d') |
|
) |
|
fig.update_layout( |
|
height=350, |
|
plot_bgcolor='#f8f9fa', |
|
paper_bgcolor='white' |
|
) |
|
return fig |
|
|
|
def create_performance_comparison(self, performance_metrics: Dict[str, Any]) -> go.Figure: |
|
"""Create performance comparison visualization.""" |
|
|
|
|
|
baseline_time = Config.BENCHMARK_BASELINE['single_agent_time'] |
|
actual_time = performance_metrics.get('average_task_completion_time', baseline_time) |
|
time_improvement = performance_metrics.get('time_improvement_percentage', 0) |
|
|
|
|
|
categories = ['Single Agent', 'Multi-Agent System'] |
|
values = [baseline_time, actual_time] |
|
colors = ['#e74c3c', '#2ecc71'] |
|
|
|
fig = go.Figure(data=[ |
|
go.Bar( |
|
x=categories, |
|
y=values, |
|
marker_color=colors, |
|
text=[f'{v:.1f}s' for v in values], |
|
textposition='auto' |
|
) |
|
]) |
|
|
|
|
|
if time_improvement > 0: |
|
fig.add_annotation( |
|
x=1, y=actual_time + 5, |
|
text=f"{time_improvement:.1f}% Faster", |
|
showarrow=True, |
|
arrowhead=2, |
|
arrowsize=1, |
|
arrowwidth=2, |
|
arrowcolor='#2ecc71', |
|
font=dict(size=14, color='#2ecc71', weight='bold') |
|
) |
|
|
|
|
|
fig.update_layout( |
|
title={ |
|
'text': 'Performance Comparison', |
|
'x': 0.5, |
|
'xanchor': 'center', |
|
'font': {'size': 18, 'color': '#2c3e50'} |
|
}, |
|
yaxis_title="Average Completion Time (seconds)", |
|
height=400, |
|
plot_bgcolor='#f8f9fa', |
|
paper_bgcolor='white', |
|
font=dict(family='Inter, sans-serif'), |
|
showlegend=False |
|
) |
|
|
|
return fig |
|
|
|
class ReportGenerator: |
|
"""Generates comprehensive PDF reports from multi-agent collaboration.""" |
|
|
|
def __init__(self): |
|
self.styles = getSampleStyleSheet() |
|
self.custom_styles = self._create_custom_styles() |
|
|
|
def _create_custom_styles(self) -> Dict[str, ParagraphStyle]: |
|
"""Create custom paragraph styles for the report.""" |
|
|
|
custom_styles = {} |
|
|
|
|
|
custom_styles['CustomTitle'] = ParagraphStyle( |
|
'CustomTitle', |
|
parent=self.styles['Heading1'], |
|
fontSize=24, |
|
textColor=colors.HexColor('#2c3e50'), |
|
spaceAfter=30, |
|
alignment=1 |
|
) |
|
|
|
|
|
custom_styles['SectionHeader'] = ParagraphStyle( |
|
'SectionHeader', |
|
parent=self.styles['Heading2'], |
|
fontSize=16, |
|
textColor=colors.HexColor('#34495e'), |
|
spaceAfter=12, |
|
spaceBefore=20 |
|
) |
|
|
|
|
|
custom_styles['CustomBody'] = ParagraphStyle( |
|
'CustomBody', |
|
parent=self.styles['BodyText'], |
|
fontSize=10, |
|
leading=14, |
|
spaceAfter=10 |
|
) |
|
|
|
return custom_styles |
|
|
|
def generate_report(self, |
|
workflow_result: Dict[str, Any], |
|
problem_statement: str, |
|
include_sections: List[str] = None, |
|
filename: str = "multi_agent_analysis_report.pdf") -> str: |
|
"""Generate comprehensive PDF report from workflow results.""" |
|
|
|
try: |
|
|
|
doc = SimpleDocTemplate( |
|
filename, |
|
pagesize=letter, |
|
rightMargin=72, |
|
leftMargin=72, |
|
topMargin=72, |
|
bottomMargin=18 |
|
) |
|
|
|
|
|
elements = [] |
|
|
|
|
|
elements.append(Paragraph("Multi-Agent Analysis Report", self.custom_styles['CustomTitle'])) |
|
elements.append(Paragraph(Config.COMPANY_NAME, self.styles['Heading3'])) |
|
elements.append(Spacer(1, 0.2*inch)) |
|
elements.append(Paragraph(f"Generated on: {datetime.now().strftime('%B %d, %Y at %I:%M %p')}", |
|
self.styles['Normal'])) |
|
elements.append(Spacer(1, 0.3*inch)) |
|
elements.append(Paragraph(f"<b>Problem Statement:</b> {problem_statement}", |
|
self.custom_styles['CustomBody'])) |
|
elements.append(PageBreak()) |
|
|
|
|
|
section_methods = { |
|
'executive_summary': self._add_executive_summary, |
|
'task_analysis': self._add_task_analysis, |
|
'agent_contributions': self._add_agent_contributions, |
|
'key_findings': self._add_key_findings, |
|
'recommendations': self._add_recommendations, |
|
'confidence_analysis': self._add_confidence_analysis, |
|
'performance_metrics': self._add_performance_metrics |
|
} |
|
|
|
if include_sections is None: |
|
include_sections = list(section_methods.keys()) |
|
|
|
for section in include_sections: |
|
if section in section_methods: |
|
section_methods[section](elements, workflow_result, problem_statement) |
|
|
|
|
|
elements.append(PageBreak()) |
|
elements.append(Paragraph("Report Generation Details", self.custom_styles['SectionHeader'])) |
|
|
|
execution_time = workflow_result.get('execution_time', 0) |
|
timestamp = workflow_result.get('timestamp', datetime.now()) |
|
|
|
footer_text = f""" |
|
Analysis completed in {execution_time:.1f} seconds<br/> |
|
Report generated at {timestamp.strftime('%B %d, %Y at %I:%M %p')}<br/> |
|
<br/> |
|
Powered by {Config.COMPANY_NAME}<br/> |
|
Advanced Multi-Agent AI Collaboration System |
|
""" |
|
|
|
elements.append(Paragraph(footer_text, self.styles['Normal'])) |
|
|
|
|
|
doc.build(elements) |
|
|
|
return filename |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating report: {str(e)}") |
|
return None |
|
|
|
def _add_executive_summary(self, elements: list, workflow_result: Dict, problem_statement: str): |
|
"""Add executive summary section to report.""" |
|
elements.append(Paragraph("Executive Summary", self.custom_styles['SectionHeader'])) |
|
|
|
tasks = workflow_result.get('tasks', []) |
|
success_rate = workflow_result.get('success_rate', 0) |
|
execution_time = workflow_result.get('execution_time', 0) |
|
performance = workflow_result.get('performance_metrics', {}) |
|
|
|
|
|
synthesis_task = None |
|
for task in tasks: |
|
if task.assigned_to and 'Synthesizer' in task.assigned_to: |
|
synthesis_task = task |
|
break |
|
|
|
summary_text = f""" |
|
The multi-agent system successfully analyzed the problem through coordinated efforts of |
|
specialized agents, achieving a <b>{success_rate:.0%} task completion rate</b> in |
|
<b>{execution_time:.1f} seconds</b>. |
|
""" |
|
|
|
elements.append(Paragraph(summary_text, self.custom_styles['CustomBody'])) |
|
|
|
if synthesis_task and isinstance(synthesis_task.result, dict): |
|
exec_summary = synthesis_task.result.get('executive_summary', '') |
|
if exec_summary: |
|
elements.append(Spacer(1, 0.1*inch)) |
|
elements.append(Paragraph(exec_summary, self.custom_styles['CustomBody'])) |
|
|
|
|
|
time_improvement = performance.get('time_improvement_percentage', 0) |
|
efficiency_score = performance.get('efficiency_score', 0) |
|
|
|
performance_text = f""" |
|
<br/> |
|
<b>Key Performance Indicators:</b><br/> |
|
• Performance Improvement: {time_improvement:.1f}% faster than single-agent approach<br/> |
|
• System Efficiency Score: {efficiency_score:.2f}/1.0<br/> |
|
• Total Collaborations: {performance.get('total_collaborations', 0)}<br/> |
|
""" |
|
|
|
elements.append(Paragraph(performance_text, self.custom_styles['CustomBody'])) |
|
elements.append(Spacer(1, 0.2*inch)) |
|
|
|
def _add_task_analysis(self, elements: list, workflow_result: Dict, problem_statement: str): |
|
"""Add task analysis section to report.""" |
|
elements.append(Paragraph("Task Analysis", self.custom_styles['SectionHeader'])) |
|
|
|
tasks = workflow_result.get('tasks', []) |
|
|
|
|
|
completed_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED] |
|
failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED] |
|
|
|
overview_text = f""" |
|
<b>Task Overview:</b><br/> |
|
• Total Tasks: {len(tasks)}<br/> |
|
• Completed: {len(completed_tasks)}<br/> |
|
• Failed: {len(failed_tasks)}<br/> |
|
• Average Confidence: {np.mean([t.confidence for t in completed_tasks]) if completed_tasks else 0:.2%}<br/> |
|
""" |
|
|
|
elements.append(Paragraph(overview_text, self.custom_styles['CustomBody'])) |
|
elements.append(Spacer(1, 0.1*inch)) |
|
|
|
|
|
task_data = [['Task ID', 'Description', 'Agent', 'Status', 'Confidence']] |
|
|
|
for task in tasks: |
|
task_data.append([ |
|
task.id, |
|
task.description[:50] + '...' if len(task.description) > 50 else task.description, |
|
task.assigned_to or 'N/A', |
|
task.status.value.title(), |
|
f"{task.confidence:.0%}" |
|
]) |
|
|
|
task_table = Table(task_data, colWidths=[1*inch, 2.5*inch, 1.2*inch, 1*inch, 1*inch]) |
|
task_table.setStyle(TableStyle([ |
|
('BACKGROUND', (0, 0), (-1, 0), colors.grey), |
|
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), |
|
('ALIGN', (0, 0), (-1, -1), 'LEFT'), |
|
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), |
|
('FONTSIZE', (0, 0), (-1, -1), 9), |
|
('BOTTOMPADDING', (0, 0), (-1, 0), 12), |
|
('BACKGROUND', (0, 1), (-1, -1), colors.beige), |
|
('GRID', (0, 0), (-1, -1), 1, colors.black) |
|
])) |
|
|
|
elements.append(task_table) |
|
elements.append(Spacer(1, 0.2*inch)) |
|
|
|
def _add_agent_contributions(self, elements: list, workflow_result: Dict, problem_statement: str): |
|
"""Add agent contributions section to report.""" |
|
elements.append(Paragraph("Agent Contributions", self.custom_styles['SectionHeader'])) |
|
|
|
contributions = workflow_result.get('agent_contributions', {}) |
|
|
|
for agent, stats in contributions.items(): |
|
role = stats.get('role', 'Unknown') |
|
tasks_completed = stats.get('tasks_completed', 0) |
|
avg_confidence = stats.get('average_confidence', 0) |
|
exec_time = stats.get('total_execution_time', 0) |
|
collab_count = stats.get('collaboration_count', 0) |
|
|
|
agent_text = f""" |
|
<b>{agent} ({role}):</b><br/> |
|
• Tasks Completed: {tasks_completed}<br/> |
|
• Average Confidence: {avg_confidence:.0%}<br/> |
|
• Total Execution Time: {exec_time:.1f}s<br/> |
|
• Collaborations: {collab_count}<br/> |
|
""" |
|
|
|
elements.append(Paragraph(agent_text, self.custom_styles['CustomBody'])) |
|
elements.append(Spacer(1, 0.1*inch)) |
|
|
|
def _add_key_findings(self, elements: list, workflow_result: Dict, problem_statement: str): |
|
"""Add key findings section to report.""" |
|
elements.append(Paragraph("Key Findings", self.custom_styles['SectionHeader'])) |
|
|
|
tasks = workflow_result.get('tasks', []) |
|
|
|
findings_by_type = { |
|
'Research Findings': [], |
|
'Analytical Insights': [], |
|
'Critical Observations': [], |
|
'Synthesized Conclusions': [] |
|
} |
|
|
|
|
|
for task in tasks: |
|
if task.status == TaskStatus.COMPLETED and task.result: |
|
role = task.metadata.get('suggested_role', '') |
|
|
|
if isinstance(task.result, dict): |
|
if 'Researcher' in role and 'key_points' in task.result: |
|
findings_by_type['Research Findings'].extend(task.result['key_points'][:3]) |
|
|
|
elif 'Analyst' in role and 'insights' in task.result: |
|
findings_by_type['Analytical Insights'].extend(task.result['insights'][:3]) |
|
|
|
elif 'Critic' in role and 'strengths' in task.result: |
|
for strength in task.result['strengths'][:2]: |
|
if isinstance(strength, dict): |
|
findings_by_type['Critical Observations'].append( |
|
strength.get('strength', str(strength)) |
|
) |
|
else: |
|
findings_by_type['Critical Observations'].append(str(strength)) |
|
|
|
elif 'Synthesizer' in role and 'key_themes' in task.result: |
|
for theme in task.result['key_themes'][:2]: |
|
if isinstance(theme, dict): |
|
findings_by_type['Synthesized Conclusions'].append( |
|
theme.get('description', str(theme)) |
|
) |
|
else: |
|
findings_by_type['Synthesized Conclusions'].append(str(theme)) |
|
|
|
|
|
for finding_type, findings in findings_by_type.items(): |
|
if findings: |
|
elements.append(Paragraph(f"<b>{finding_type}:</b>", self.styles['Heading4'])) |
|
for finding in findings: |
|
elements.append(Paragraph(f"• {finding}", self.custom_styles['CustomBody'])) |
|
elements.append(Spacer(1, 0.1*inch)) |
|
|
|
def _add_recommendations(self, elements: list, workflow_result: Dict, problem_statement: str): |
|
"""Add recommendations section to report.""" |
|
elements.append(Paragraph("Recommendations", self.custom_styles['SectionHeader'])) |
|
|
|
tasks = workflow_result.get('tasks', []) |
|
|
|
all_recommendations = [] |
|
|
|
|
|
for task in tasks: |
|
if task.status == TaskStatus.COMPLETED and task.result: |
|
if isinstance(task.result, dict): |
|
for field in ['recommendations', 'final_recommendations', 'improvements']: |
|
if field in task.result: |
|
recs = task.result[field] |
|
for rec in recs: |
|
if isinstance(rec, dict): |
|
all_recommendations.append(rec) |
|
else: |
|
all_recommendations.append({ |
|
'recommendation': str(rec), |
|
'priority': 'medium', |
|
'source': task.assigned_to |
|
}) |
|
|
|
if not all_recommendations: |
|
elements.append(Paragraph("No specific recommendations were generated.", |
|
self.custom_styles['CustomBody'])) |
|
return |
|
|
|
|
|
priority_groups = {'high': [], 'medium': [], 'low': []} |
|
|
|
for rec in all_recommendations: |
|
priority = rec.get('priority', 'medium') |
|
if priority in priority_groups: |
|
priority_groups[priority].append(rec) |
|
|
|
|
|
for priority, recs in priority_groups.items(): |
|
if recs: |
|
elements.append(Paragraph(f"<b>{priority.title()} Priority:</b>", self.styles['Heading4'])) |
|
for rec in recs[:5]: |
|
rec_text = rec.get('recommendation', rec) |
|
elements.append(Paragraph(f"• {rec_text}", self.custom_styles['CustomBody'])) |
|
elements.append(Spacer(1, 0.1*inch)) |
|
|
|
def _add_confidence_analysis(self, elements: list, workflow_result: Dict, problem_statement: str): |
|
"""Add confidence analysis section to report.""" |
|
elements.append(Paragraph("Confidence Analysis", self.custom_styles['SectionHeader'])) |
|
|
|
tasks = workflow_result.get('tasks', []) |
|
contributions = workflow_result.get('agent_contributions', {}) |
|
|
|
|
|
task_confidences = [t.confidence for t in tasks if t.confidence > 0] |
|
overall_confidence = np.mean(task_confidences) if task_confidences else 0 |
|
|
|
confidence_text = f""" |
|
<b>Overall Confidence Score: {overall_confidence:.0%}</b><br/> |
|
<br/> |
|
The confidence score reflects the system's assessment of result quality and reliability |
|
based on evidence strength, consistency, and completeness. |
|
""" |
|
|
|
elements.append(Paragraph(confidence_text, self.custom_styles['CustomBody'])) |
|
elements.append(Spacer(1, 0.1*inch)) |
|
|
|
|
|
conf_data = [['Agent Role', 'Average Confidence', 'Tasks Completed']] |
|
|
|
for agent, stats in contributions.items(): |
|
role = stats.get('role', 'Unknown') |
|
avg_conf = stats.get('average_confidence', 0) |
|
tasks = stats.get('tasks_completed', 0) |
|
conf_data.append([role, f"{avg_conf:.0%}", str(tasks)]) |
|
|
|
conf_table = Table(conf_data, colWidths=[2*inch, 1.5*inch, 1.5*inch]) |
|
conf_table.setStyle(TableStyle([ |
|
('BACKGROUND', (0, 0), (-1, 0), colors.grey), |
|
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), |
|
('ALIGN', (0, 0), (-1, -1), 'CENTER'), |
|
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), |
|
('FONTSIZE', (0, 0), (-1, -1), 10), |
|
('BOTTOMPADDING', (0, 0), (-1, 0), 12), |
|
('BACKGROUND', (0, 1), (-1, -1), colors.beige), |
|
('GRID', (0, 0), (-1, -1), 1, colors.black) |
|
])) |
|
|
|
elements.append(conf_table) |
|
elements.append(Spacer(1, 0.2*inch)) |
|
|
|
def _add_performance_metrics(self, elements: list, workflow_result: Dict, problem_statement: str): |
|
"""Add performance metrics section to report.""" |
|
elements.append(Paragraph("Performance Metrics", self.custom_styles['SectionHeader'])) |
|
|
|
performance = workflow_result.get('performance_metrics', {}) |
|
|
|
if not performance: |
|
elements.append(Paragraph("No performance metrics available.", |
|
self.custom_styles['CustomBody'])) |
|
return |
|
|
|
|
|
total_time = performance.get('total_execution_time', 0) |
|
avg_task_time = performance.get('average_task_completion_time', 0) |
|
total_collab = performance.get('total_collaborations', 0) |
|
total_messages = performance.get('total_messages', 0) |
|
efficiency = performance.get('efficiency_score', 0) |
|
time_improvement = performance.get('time_improvement_percentage', 0) |
|
|
|
metrics_text = f""" |
|
<b>System Performance Overview:</b><br/> |
|
<br/> |
|
• Total Execution Time: {total_time:.1f}s<br/> |
|
• Average Task Time: {avg_task_time:.1f}s<br/> |
|
• Time Improvement: {time_improvement:.1f}% faster than baseline<br/> |
|
• Total Collaborations: {total_collab}<br/> |
|
• Message Exchanges: {total_messages}<br/> |
|
• Efficiency Score: {efficiency:.2%}<br/> |
|
""" |
|
|
|
elements.append(Paragraph(metrics_text, self.custom_styles['CustomBody'])) |
|
|
|
|
|
elements.append(Spacer(1, 0.1*inch)) |
|
elements.append(Paragraph("<b>Performance Insights:</b>", self.styles['Heading4'])) |
|
|
|
insights = [] |
|
if time_improvement > 30: |
|
insights.append("Exceptional Performance: The multi-agent system achieved significant time savings through parallel processing.") |
|
elif time_improvement > 15: |
|
insights.append("Good Performance: The system demonstrated efficient task distribution and execution.") |
|
else: |
|
insights.append("Standard Performance: The system completed tasks within expected parameters.") |
|
|
|
if efficiency > 0.8: |
|
insights.append("High Efficiency: Excellent resource utilization and agent coordination.") |
|
elif efficiency > 0.6: |
|
insights.append("Moderate Efficiency: Good balance between speed and quality.") |
|
else: |
|
insights.append("Efficiency Opportunity: Consider optimizing agent workflows for better performance.") |
|
|
|
for insight in insights: |
|
elements.append(Paragraph(f"• {insight}", self.custom_styles['CustomBody'])) |
|
|
|
|
|
def create_gradio_interface(): |
|
"""Create the main Gradio interface for the multi-agent system.""" |
|
|
|
|
|
coordinator = None |
|
visualizer = WorkflowVisualizer() |
|
report_generator = ReportGenerator() |
|
|
|
|
|
current_workflow = None |
|
current_problem = "" |
|
demo_mode = False |
|
|
|
def initialize_agents(api_key: str, model: str = "gpt-4", use_demo: bool = False) -> str: |
|
"""Initialize the multi-agent system.""" |
|
nonlocal coordinator, demo_mode |
|
|
|
demo_mode = use_demo |
|
|
|
if not use_demo and not api_key: |
|
return "Please provide an OpenAI API key or enable Demo Mode to initialize the agents." |
|
|
|
try: |
|
|
|
llm = None |
|
if not use_demo and api_key: |
|
llm = ChatOpenAI( |
|
api_key=api_key, |
|
model=model, |
|
temperature=Config.TEMPERATURE, |
|
max_tokens=Config.MAX_TOKENS |
|
) |
|
|
|
|
|
coordinator = CoordinatorAgent("Coordinator", llm) |
|
|
|
|
|
researcher = ResearcherAgent("Researcher-1", llm) |
|
analyst = AnalystAgent("Analyst-1", llm) |
|
critic = CriticAgent("Critic-1", llm) |
|
synthesizer = SynthesizerAgent("Synthesizer-1", llm) |
|
|
|
|
|
coordinator.register_agent(researcher) |
|
coordinator.register_agent(analyst) |
|
coordinator.register_agent(critic) |
|
coordinator.register_agent(synthesizer) |
|
|
|
mode_text = "Demo Mode" if use_demo else f"Live Mode ({model})" |
|
return f"Successfully initialized multi-agent system with {len(coordinator.agents)} agents in {mode_text}." |
|
|
|
except Exception as e: |
|
logger.error(f"Error initializing agents: {str(e)}") |
|
return f"Error initializing agents: {str(e)}" |
|
|
|
async def analyze_problem(problem: str, execution_mode: str, use_template: bool = False) -> Tuple[str, Any, Any, Any, Any]: |
|
"""Analyze a problem using the multi-agent system.""" |
|
nonlocal current_workflow, current_problem |
|
|
|
if not coordinator: |
|
return "Please initialize the agents first.", None, None, None, None |
|
|
|
if not problem: |
|
return "Please enter a problem to analyze.", None, None, None, None |
|
|
|
current_problem = problem |
|
|
|
try: |
|
|
|
status = "Decomposing problem into subtasks..." |
|
|
|
|
|
tasks = await coordinator.decompose_problem(problem, use_template=use_template) |
|
|
|
if not tasks: |
|
return "Failed to decompose problem into tasks.", None, None, None, None |
|
|
|
|
|
status = f"Executing {len(tasks)} tasks using {execution_mode} mode..." |
|
parallel = execution_mode == "Parallel" |
|
|
|
|
|
current_workflow = await coordinator.execute_workflow(tasks, parallel=parallel) |
|
|
|
|
|
active_agents = list(coordinator.agents.keys()) |
|
|
|
workflow_graph = visualizer.create_workflow_graph( |
|
current_workflow['workflow_graph'], |
|
active_agents=active_agents |
|
) |
|
|
|
timeline_chart = visualizer.create_task_timeline(tasks) |
|
|
|
confidence_heatmap = visualizer.create_confidence_heatmap( |
|
current_workflow['agent_contributions'] |
|
) |
|
|
|
performance_chart = visualizer.create_performance_comparison( |
|
current_workflow['performance_metrics'] |
|
) |
|
|
|
|
|
success_rate = current_workflow['success_rate'] |
|
execution_time = current_workflow['execution_time'] |
|
performance = current_workflow['performance_metrics'] |
|
|
|
status = f"""Analysis completed successfully! |
|
|
|
Results Summary: |
|
- Tasks executed: {len(tasks)} |
|
- Success rate: {success_rate:.0%} |
|
- Execution time: {execution_time:.1f} seconds |
|
- Performance improvement: {performance.get('time_improvement_percentage', 0):.1f}% faster |
|
- Agents involved: {len(coordinator.agents)} |
|
|
|
Agent Activity: |
|
- Total collaborations: {performance.get('total_collaborations', 0)} |
|
- Messages exchanged: {performance.get('total_messages', 0)} |
|
- Efficiency score: {performance.get('efficiency_score', 0):.2%}""" |
|
|
|
return status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing problem: {str(e)}") |
|
return f"Error during analysis: {str(e)}", None, None, None, None |
|
|
|
def generate_report(selected_sections: List[str]) -> Tuple[Optional[str], str]: |
|
"""Generate a report from the current workflow results.""" |
|
|
|
if not current_workflow: |
|
return None, "No analysis results available. Please run an analysis first." |
|
|
|
try: |
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"multi_agent_report_{timestamp}.pdf" |
|
|
|
report_path = report_generator.generate_report( |
|
current_workflow, |
|
current_problem, |
|
include_sections=selected_sections, |
|
filename=filename |
|
) |
|
|
|
if report_path: |
|
return report_path, f"Report generated successfully: {filename}" |
|
else: |
|
return None, "Error generating report." |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating report: {str(e)}") |
|
return None, f"Error generating report: {str(e)}" |
|
|
|
def get_agent_details(agent_name: str) -> str: |
|
"""Get detailed information about a specific agent.""" |
|
|
|
if not coordinator or agent_name not in coordinator.agents: |
|
return "Agent not found or system not initialized." |
|
|
|
agent = coordinator.agents[agent_name] |
|
status = agent.get_status_summary() |
|
|
|
details = f"""## Agent Profile: {agent.name} |
|
|
|
**Role:** {agent.role.value} |
|
**Status:** {'Active' if agent.active else 'Inactive'} |
|
**Completed Tasks:** {len(agent.completed_tasks)} |
|
**Current Task:** {agent.current_task.description if agent.current_task else 'None'} |
|
**Average Confidence:** {status['average_confidence']:.0%} |
|
**Collaborations:** {status['collaboration_count']} |
|
|
|
### Recent Task History""" |
|
|
|
for i, task in enumerate(agent.completed_tasks[-5:], 1): |
|
status_icon = "✓" if task.status == TaskStatus.COMPLETED else "✗" |
|
exec_time = task.performance_metrics.get('execution_time', 0) |
|
|
|
details += f""" |
|
|
|
**{i}. {status_icon} {task.id}** |
|
- Description: {task.description} |
|
- Confidence: {task.confidence:.0%} |
|
- Execution Time: {exec_time:.1f}s""" |
|
|
|
|
|
if agent.performance_tracker.metrics['task_completion_times']: |
|
avg_time = np.mean(agent.performance_tracker.metrics['task_completion_times']) |
|
details += f"\n\n### Performance Statistics\n" |
|
details += f"• Average Task Time: {avg_time:.1f}s\n" |
|
details += f"• Total Active Time: {sum(agent.performance_tracker.metrics['task_completion_times']):.1f}s" |
|
|
|
return details |
|
|
|
def get_workflow_insights() -> str: |
|
"""Get insights about the multi-agent system performance.""" |
|
|
|
if not coordinator: |
|
return "System not initialized." |
|
|
|
insights = coordinator.get_workflow_insights() |
|
|
|
if insights.get('total_workflows_executed', 0) == 0: |
|
return "No workflow executions yet. Run an analysis to see performance insights." |
|
|
|
content = f"""## Workflow Insights |
|
|
|
### System Performance Overview |
|
|
|
- **Total Workflows:** {insights['total_workflows_executed']} |
|
- **Average Execution Time:** {insights['average_execution_time']:.1f}s |
|
- **Average Success Rate:** {insights['average_success_rate']:.0%} |
|
- **Most Efficient Agent:** {insights.get('most_efficient_agent', 'N/A')} |
|
- **Highest Quality Agent:** {insights.get('highest_quality_agent', 'N/A')} |
|
|
|
### Agent Efficiency Rankings""" |
|
|
|
if insights.get('agent_efficiency'): |
|
content += "\n\n| Agent | Avg Time/Task | Avg Confidence | Total Tasks |\n" |
|
content += "|-------|---------------|----------------|-------------|\n" |
|
|
|
for agent, efficiency in insights['agent_efficiency'].items(): |
|
content += f"| {agent} | {efficiency['avg_time_per_task']:.1f}s | " |
|
content += f"{efficiency['avg_confidence']:.0%} | {efficiency['total_tasks']} |\n" |
|
|
|
return content |
|
|
|
|
|
with gr.Blocks( |
|
title="Multi-Agent AI Collaboration System", |
|
theme=gr.themes.Base(), |
|
css=""" |
|
.gradio-container { |
|
font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', sans-serif; |
|
max-width: 1400px; |
|
margin: 0 auto; |
|
} |
|
|
|
h1, h2, h3 { |
|
color: #2c3e50; |
|
} |
|
|
|
.gr-button-primary { |
|
background-color: #3498db !important; |
|
border-color: #3498db !important; |
|
} |
|
|
|
.gr-button-primary:hover { |
|
background-color: #2980b9 !important; |
|
border-color: #2980b9 !important; |
|
} |
|
|
|
.status-box { |
|
background-color: #f3f4f6; |
|
border-radius: 8px; |
|
padding: 1rem; |
|
margin: 1rem 0; |
|
border-left: 4px solid #3b82f6; |
|
} |
|
""" |
|
) as interface: |
|
|
|
gr.Markdown(""" |
|
# Multi-Agent AI Collaboration System |
|
|
|
Advanced AI system with specialized agents working together to solve complex problems through intelligent task decomposition and parallel processing. |
|
""") |
|
|
|
|
|
with gr.Group(): |
|
gr.Markdown("### System Configuration") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
api_key_input = gr.Textbox( |
|
label="OpenAI API Key", |
|
placeholder="sk-...", |
|
type="password", |
|
info="Required for live mode. Leave empty for demo mode." |
|
) |
|
with gr.Column(scale=1): |
|
model_select = gr.Dropdown( |
|
choices=["gpt-4", "gpt-3.5-turbo"], |
|
value="gpt-4", |
|
label="Model", |
|
info="Select the LLM model" |
|
) |
|
with gr.Column(scale=1): |
|
demo_mode_checkbox = gr.Checkbox( |
|
label="Demo Mode", |
|
value=False, |
|
info="Run without API key" |
|
) |
|
with gr.Column(scale=1): |
|
init_button = gr.Button( |
|
"Initialize Agents", |
|
variant="primary", |
|
size="lg" |
|
) |
|
|
|
init_status = gr.Textbox( |
|
label="Initialization Status", |
|
interactive=False |
|
) |
|
|
|
|
|
with gr.Tabs() as tabs: |
|
|
|
with gr.TabItem("Problem Analysis", id=1): |
|
with gr.Group(): |
|
gr.Markdown("### Enter a complex problem for multi-agent analysis") |
|
|
|
problem_input = gr.Textbox( |
|
label="Problem Statement", |
|
placeholder="Example: Analyze the potential impact of AI on healthcare delivery in the next 5 years", |
|
lines=3, |
|
info="Describe a complex problem that requires multiple perspectives" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
execution_mode = gr.Radio( |
|
choices=["Sequential", "Parallel"], |
|
value="Parallel", |
|
label="Execution Mode", |
|
info="Parallel mode is faster but requires more resources" |
|
) |
|
with gr.Column(scale=1): |
|
use_template = gr.Checkbox( |
|
label="Use Workflow Template", |
|
value=True, |
|
info="Automatically match to predefined workflows" |
|
) |
|
with gr.Column(scale=2): |
|
analyze_button = gr.Button( |
|
"Analyze Problem", |
|
variant="primary", |
|
size="lg" |
|
) |
|
|
|
analysis_status = gr.Textbox( |
|
label="Analysis Status", |
|
interactive=False, |
|
lines=10 |
|
) |
|
|
|
|
|
with gr.Group(): |
|
gr.Markdown("### Analysis Visualizations") |
|
|
|
with gr.Row(): |
|
workflow_graph = gr.Plot(label="Agent Collaboration Network") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
timeline_chart = gr.Plot(label="Task Execution Timeline") |
|
with gr.Column(): |
|
confidence_heatmap = gr.Plot(label="Agent Performance Metrics") |
|
|
|
with gr.Row(): |
|
performance_chart = gr.Plot(label="Performance Comparison") |
|
|
|
|
|
with gr.TabItem("Agent Details", id=2): |
|
with gr.Group(): |
|
gr.Markdown("### View detailed information about each agent") |
|
|
|
with gr.Row(): |
|
agent_selector = gr.Dropdown( |
|
choices=["Researcher-1", "Analyst-1", "Critic-1", "Synthesizer-1"], |
|
label="Select Agent", |
|
info="Choose an agent to view their profile and performance" |
|
) |
|
agent_details_button = gr.Button( |
|
"Get Agent Details", |
|
variant="secondary" |
|
) |
|
|
|
agent_details_output = gr.Markdown() |
|
|
|
with gr.Group(): |
|
gr.Markdown("### System Insights") |
|
|
|
insights_button = gr.Button( |
|
"Get Workflow Insights", |
|
variant="secondary" |
|
) |
|
|
|
insights_output = gr.Markdown() |
|
|
|
|
|
with gr.TabItem("Report Generation", id=3): |
|
with gr.Group(): |
|
gr.Markdown("### Generate comprehensive analysis report") |
|
|
|
section_selector = gr.CheckboxGroup( |
|
choices=[ |
|
"executive_summary", |
|
"task_analysis", |
|
"agent_contributions", |
|
"key_findings", |
|
"recommendations", |
|
"confidence_analysis", |
|
"performance_metrics" |
|
], |
|
value=[ |
|
"executive_summary", |
|
"key_findings", |
|
"recommendations", |
|
"confidence_analysis" |
|
], |
|
label="Select Report Sections", |
|
info="Choose which sections to include in the report" |
|
) |
|
|
|
generate_report_button = gr.Button( |
|
"Generate PDF Report", |
|
variant="primary", |
|
size="lg" |
|
) |
|
|
|
with gr.Row(): |
|
report_download = gr.File(label="Download Report") |
|
report_status = gr.Textbox(label="Report Status", interactive=False) |
|
|
|
|
|
with gr.TabItem("Example Problems", id=4): |
|
gr.Markdown(""" |
|
### Example Problems for Analysis |
|
|
|
Click on any example to load it into the analysis tab. These examples demonstrate different types of complex problems suitable for multi-agent analysis. |
|
""") |
|
|
|
example_problems = [ |
|
{ |
|
"title": "Business Strategy", |
|
"problem": "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty and managing existing physical stores", |
|
"description": "Complex business transformation requiring market analysis, risk assessment, and strategic planning" |
|
}, |
|
{ |
|
"title": "Technology Assessment", |
|
"problem": "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management for a global manufacturing company", |
|
"description": "Technical evaluation requiring understanding of emerging technology and business operations" |
|
}, |
|
{ |
|
"title": "Market Analysis", |
|
"problem": "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants in the North American market", |
|
"description": "Market research requiring industry analysis, competitor assessment, and trend identification" |
|
}, |
|
{ |
|
"title": "Policy Evaluation", |
|
"problem": "Assess the implications of remote work policies on organizational culture, productivity, and talent retention in technology companies", |
|
"description": "Organizational analysis requiring understanding of human resources, culture, and productivity metrics" |
|
}, |
|
{ |
|
"title": "Innovation Planning", |
|
"problem": "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools while ensuring patient privacy and regulatory compliance", |
|
"description": "Innovation strategy requiring technical, regulatory, and ethical considerations" |
|
} |
|
] |
|
|
|
example_buttons = [] |
|
for i, example in enumerate(example_problems): |
|
with gr.Group(): |
|
gr.Markdown(f""" |
|
#### {example['title']} |
|
*{example['description']}* |
|
""") |
|
btn = gr.Button( |
|
f"Load This Example", |
|
variant="secondary", |
|
size="sm" |
|
) |
|
example_buttons.append((btn, example['problem'])) |
|
|
|
|
|
with gr.TabItem("Help", id=5): |
|
gr.Markdown(""" |
|
## How to Use the Multi-Agent AI Collaboration System |
|
|
|
### Getting Started |
|
|
|
1. **Initialize the System** |
|
- Enter your OpenAI API key for live analysis (optional) |
|
- Or enable Demo Mode to explore without an API key |
|
- Select your preferred model (GPT-4 recommended) |
|
- Click "Initialize Agents" |
|
|
|
2. **Analyze a Problem** |
|
- Enter a complex problem in the Problem Analysis tab |
|
- Choose execution mode (Parallel is faster) |
|
- Optionally use workflow templates for common problem types |
|
- Click "Analyze Problem" |
|
|
|
3. **Review Results** |
|
- View the agent collaboration network |
|
- Check the task execution timeline |
|
- Review performance metrics |
|
- Explore individual agent details |
|
|
|
4. **Generate Report** |
|
- Select desired report sections |
|
- Click "Generate PDF Report" |
|
- Download the comprehensive analysis |
|
|
|
### Understanding the Agents |
|
|
|
- **Researcher**: Gathers information and identifies key facts |
|
- **Analyst**: Processes data and identifies patterns |
|
- **Critic**: Evaluates quality and identifies gaps |
|
- **Synthesizer**: Combines insights into actionable recommendations |
|
- **Coordinator**: Manages workflow and facilitates collaboration |
|
|
|
### Tips for Best Results |
|
|
|
- Be specific and detailed in your problem statements |
|
- Complex, multi-faceted problems work best |
|
- Use parallel execution for faster results |
|
- Review agent details to understand the analysis process |
|
- Generate reports for comprehensive documentation |
|
|
|
### Troubleshooting |
|
|
|
- **Initialization fails**: Check your API key or enable Demo Mode |
|
- **Analysis takes too long**: Try Sequential mode or simpler problems |
|
- **Empty visualizations**: Ensure analysis completed successfully |
|
- **Report generation fails**: Check that analysis was completed first |
|
""") |
|
|
|
|
|
init_button.click( |
|
fn=initialize_agents, |
|
inputs=[api_key_input, model_select, demo_mode_checkbox], |
|
outputs=init_status |
|
) |
|
|
|
analyze_button.click( |
|
fn=lambda p, m, t: asyncio.run(analyze_problem(p, m, t)), |
|
inputs=[problem_input, execution_mode, use_template], |
|
outputs=[analysis_status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart] |
|
) |
|
|
|
agent_details_button.click( |
|
fn=get_agent_details, |
|
inputs=agent_selector, |
|
outputs=agent_details_output |
|
) |
|
|
|
insights_button.click( |
|
fn=get_workflow_insights, |
|
inputs=[], |
|
outputs=insights_output |
|
) |
|
|
|
generate_report_button.click( |
|
fn=generate_report, |
|
inputs=section_selector, |
|
outputs=[report_download, report_status] |
|
) |
|
|
|
|
|
for btn, problem in example_buttons: |
|
btn.click( |
|
fn=lambda p=problem: p, |
|
outputs=problem_input |
|
).then( |
|
fn=lambda: gr.Tabs.update(selected=1), |
|
outputs=tabs |
|
) |
|
|
|
return interface |
|
|
|
|
|
if __name__ == "__main__": |
|
interface = create_gradio_interface() |
|
interface.launch( |
|
share=True, |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
favicon_path=None, |
|
show_error=True |
|
) |