diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,10 +1,10 @@ # Multi-Agent AI Collaboration System # Author: Spencer Purdy # Description: Enterprise-grade multi-agent system with specialized AI agents collaborating -# to solve complex problems through task decomposition and parallel processing. +# to solve complex problems through intelligent task decomposition and parallel processing. # Installation (uncomment for Google Colab) -# !pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic +# !pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic scipy import os import json @@ -27,6 +27,7 @@ import networkx as nx import matplotlib.pyplot as plt from matplotlib.patches import FancyBboxPatch import plotly.graph_objects as go +import plotly.express as px from plotly.subplots import make_subplots # LangChain and AI libraries @@ -39,7 +40,7 @@ from pydantic import BaseModel, Field # Async libraries import aiohttp -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed # Configure logging logging.basicConfig( @@ -64,15 +65,33 @@ class Config: GRAPH_UPDATE_INTERVAL = 0.5 # seconds NODE_COLORS = { 'Researcher': '#3498db', - 'Analyst': '#e74c3c', + 'Analyst': '#e74c3c', 'Critic': '#f39c12', 'Synthesizer': '#2ecc71', 'Coordinator': '#9b59b6' } + # Performance settings + ENABLE_PERFORMANCE_TRACKING = True + BENCHMARK_BASELINE = { + "single_agent_time": 45.0, + "single_agent_quality": 0.72 + } + # Report settings CONFIDENCE_THRESHOLD = 0.7 MAX_REPORT_SECTIONS = 10 + COMPANY_NAME = "Multi-Agent AI Platform" + + # Demo settings + DEMO_MODE_ENABLED = True + DEMO_PROBLEMS = [ + "Analyze the impact of remote work on team productivity and collaboration", + "Develop a strategy for sustainable urban transportation", + "Evaluate the risks and benefits of AI in healthcare", + "Design a framework for ethical AI development", + "Create a plan for digital transformation in education" + ] class AgentRole(Enum): """Enumeration of agent roles in the system.""" @@ -89,6 +108,14 @@ class TaskStatus(Enum): COMPLETED = "completed" FAILED = "failed" +class MessageType(Enum): + """Types of messages between agents.""" + TASK_ASSIGNMENT = "task_assignment" + COLLABORATION_REQUEST = "collaboration_request" + INFORMATION_SHARING = "information_sharing" + FEEDBACK = "feedback" + COMPLETION_REPORT = "completion_report" + @dataclass class Task: """Represents a task to be executed by agents.""" @@ -102,6 +129,7 @@ class Task: created_at: datetime = field(default_factory=datetime.now) completed_at: Optional[datetime] = None metadata: Dict[str, Any] = field(default_factory=dict) + performance_metrics: Dict[str, float] = field(default_factory=dict) @dataclass class AgentMessage: @@ -109,9 +137,102 @@ class AgentMessage: sender: str recipient: str content: str - message_type: str = "task" + message_type: MessageType metadata: Dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) + priority: int = 1 # 1 (low) to 5 (high) + +class PerformanceTracker: + """Tracks performance metrics for the multi-agent system.""" + + def __init__(self): + self.metrics = { + 'task_completion_times': [], + 'agent_utilization': {}, + 'collaboration_count': 0, + 'total_messages': 0, + 'quality_scores': [], + 'system_start_time': None, + 'system_end_time': None + } + + def start_tracking(self): + """Start performance tracking.""" + self.metrics['system_start_time'] = datetime.now() + + def end_tracking(self): + """End performance tracking.""" + self.metrics['system_end_time'] = datetime.now() + + def record_task_completion(self, task: Task): + """Record task completion metrics.""" + if task.created_at and task.completed_at: + completion_time = (task.completed_at - task.created_at).total_seconds() + self.metrics['task_completion_times'].append(completion_time) + + def record_agent_activity(self, agent_name: str, activity_duration: float): + """Record agent activity duration.""" + if agent_name not in self.metrics['agent_utilization']: + self.metrics['agent_utilization'][agent_name] = 0 + self.metrics['agent_utilization'][agent_name] += activity_duration + + def record_collaboration(self): + """Record a collaboration event.""" + self.metrics['collaboration_count'] += 1 + + def record_message(self): + """Record a message exchange.""" + self.metrics['total_messages'] += 1 + + def get_performance_summary(self) -> Dict[str, Any]: + """Get performance summary statistics.""" + total_time = 0 + if self.metrics['system_start_time'] and self.metrics['system_end_time']: + total_time = (self.metrics['system_end_time'] - + self.metrics['system_start_time']).total_seconds() + + avg_task_time = np.mean(self.metrics['task_completion_times']) if self.metrics['task_completion_times'] else 0 + + # Calculate improvement over baseline + baseline_time = Config.BENCHMARK_BASELINE['single_agent_time'] + time_improvement = ((baseline_time - avg_task_time) / baseline_time * 100) if avg_task_time > 0 else 0 + + return { + 'total_execution_time': total_time, + 'average_task_completion_time': avg_task_time, + 'total_collaborations': self.metrics['collaboration_count'], + 'total_messages': self.metrics['total_messages'], + 'agent_utilization': self.metrics['agent_utilization'], + 'time_improvement_percentage': time_improvement, + 'efficiency_score': self._calculate_efficiency_score() + } + + def _calculate_efficiency_score(self) -> float: + """Calculate overall efficiency score.""" + # Simple efficiency calculation based on multiple factors + factors = [] + + # Task completion speed factor + if self.metrics['task_completion_times']: + avg_time = np.mean(self.metrics['task_completion_times']) + speed_factor = min(1.0, Config.BENCHMARK_BASELINE['single_agent_time'] / avg_time) + factors.append(speed_factor) + + # Collaboration efficiency factor + if self.metrics['total_messages'] > 0: + collab_factor = min(1.0, self.metrics['collaboration_count'] / self.metrics['total_messages']) + factors.append(collab_factor) + + # Agent utilization factor + if self.metrics['agent_utilization']: + utilization_values = list(self.metrics['agent_utilization'].values()) + if utilization_values: + avg_utilization = np.mean(utilization_values) + max_utilization = max(utilization_values) + balance_factor = avg_utilization / max_utilization if max_utilization > 0 else 0 + factors.append(balance_factor) + + return np.mean(factors) if factors else 0.5 class AgentMemory: """Manages agent conversation history and context.""" @@ -120,12 +241,16 @@ class AgentMemory: self.messages: List[AgentMessage] = [] self.max_messages = max_messages self.context: Dict[str, Any] = {} + self.knowledge_base: Dict[str, Any] = {} def add_message(self, message: AgentMessage): """Add a message to memory.""" self.messages.append(message) if len(self.messages) > self.max_messages: self.messages.pop(0) + + # Extract and store important information + self._extract_knowledge(message) def get_recent_messages(self, n: int = 10) -> List[AgentMessage]: """Get n most recent messages.""" @@ -135,6 +260,10 @@ class AgentMemory: """Get all messages from a specific sender.""" return [msg for msg in self.messages if msg.sender == sender] + def get_high_priority_messages(self) -> List[AgentMessage]: + """Get high priority messages.""" + return [msg for msg in self.messages if msg.priority >= 4] + def update_context(self, key: str, value: Any): """Update context information.""" self.context[key] = value @@ -142,11 +271,27 @@ class AgentMemory: def get_context(self, key: str) -> Any: """Get context information.""" return self.context.get(key) + + def _extract_knowledge(self, message: AgentMessage): + """Extract and store important knowledge from messages.""" + # Simple keyword-based knowledge extraction + keywords = ['finding', 'conclusion', 'recommendation', 'insight', 'pattern'] + content_lower = message.content.lower() + + for keyword in keywords: + if keyword in content_lower: + knowledge_key = f"{message.sender}_{keyword}_{len(self.knowledge_base)}" + self.knowledge_base[knowledge_key] = { + 'content': message.content, + 'sender': message.sender, + 'timestamp': message.timestamp, + 'type': keyword + } class BaseAgent: """Base class for all AI agents in the system.""" - def __init__(self, name: str, role: AgentRole, llm: ChatOpenAI): + def __init__(self, name: str, role: AgentRole, llm: Optional[ChatOpenAI] = None): self.name = name self.role = role self.llm = llm @@ -154,6 +299,8 @@ class BaseAgent: self.active = True self.current_task: Optional[Task] = None self.completed_tasks: List[Task] = [] + self.performance_tracker = PerformanceTracker() + self.collaboration_partners: Set[str] = set() async def process_task(self, task: Task) -> Task: """Process a task and return the result.""" @@ -161,16 +308,29 @@ class BaseAgent: task.status = TaskStatus.IN_PROGRESS task.assigned_to = self.name + # Record start time + start_time = datetime.now() + try: # Execute task based on agent role - result = await self._execute_task(task) + if self.llm: + result = await self._execute_task(task) + else: + # Demo mode - simulate execution + result = await self._simulate_task_execution(task) task.result = result task.status = TaskStatus.COMPLETED task.completed_at = datetime.now() task.confidence = self._calculate_confidence(result) + # Record performance metrics + execution_time = (task.completed_at - start_time).total_seconds() + task.performance_metrics['execution_time'] = execution_time + task.performance_metrics['confidence'] = task.confidence + self.completed_tasks.append(task) + self.performance_tracker.record_task_completion(task) except Exception as e: logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}") @@ -187,42 +347,115 @@ class BaseAgent: """Execute the task - to be implemented by subclasses.""" raise NotImplementedError("Subclasses must implement _execute_task") + async def _simulate_task_execution(self, task: Task) -> Any: + """Simulate task execution for demo mode.""" + # Simulate processing time + await asyncio.sleep(np.random.uniform(1, 3)) + + # Generate simulated result based on role + simulation_templates = { + AgentRole.RESEARCHER: { + "findings": f"Research on '{task.description}' reveals multiple perspectives and data points.", + "sources": ["Academic studies", "Industry reports", "Expert interviews"], + "key_points": [ + "Significant trends identified in the domain", + "Multiple stakeholder perspectives considered", + "Historical context provides important insights" + ] + }, + AgentRole.ANALYST: { + "analysis": f"Analysis of '{task.description}' shows clear patterns and relationships.", + "patterns": [{"description": "Trend identified", "type": "trend"}], + "insights": ["Data suggests strong correlations", "Multiple factors influence outcomes"], + "confidence_metrics": {"overall_confidence": 0.85} + }, + AgentRole.CRITIC: { + "evaluation": f"Critical evaluation of '{task.description}' identifies strengths and areas for improvement.", + "strengths": ["Comprehensive approach", "Well-structured analysis"], + "gaps": ["Additional data needed", "Alternative perspectives to consider"], + "quality_score": {"overall": 0.78} + }, + AgentRole.SYNTHESIZER: { + "synthesis": f"Synthesis of findings on '{task.description}' provides integrated insights.", + "key_themes": [{"theme": "Theme 1", "description": "Central finding"}], + "final_recommendations": ["Strategic recommendation 1", "Strategic recommendation 2"], + "executive_summary": "Comprehensive analysis reveals actionable insights." + } + } + + return simulation_templates.get(self.role, {"result": "Task completed"}) + def _calculate_confidence(self, result: Any) -> float: """Calculate confidence score for the result.""" - # Basic confidence calculation - can be overridden by subclasses - if result and isinstance(result, str) and len(result) > 50: - return min(0.9, 0.5 + len(result) / 1000) - return 0.5 + # Enhanced confidence calculation + base_confidence = 0.5 + + if result and isinstance(result, dict): + # Check for quality indicators + if 'confidence_metrics' in result: + return result['confidence_metrics'].get('overall_confidence', base_confidence) + + # Calculate based on result completeness + expected_keys = {'findings', 'analysis', 'evaluation', 'synthesis'} + actual_keys = set(result.keys()) + completeness = len(actual_keys.intersection(expected_keys)) / len(expected_keys) + + # Calculate based on content length + content_length = sum(len(str(v)) for v in result.values() if isinstance(v, (str, list))) + length_factor = min(1.0, content_length / 500) + + confidence = base_confidence + (completeness * 0.3) + (length_factor * 0.2) + return min(0.95, confidence) + + return base_confidence - async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage): + async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage) -> AgentMessage: """Handle collaboration with another agent.""" self.memory.add_message(message) + self.collaboration_partners.add(other_agent.name) + self.performance_tracker.record_collaboration() # Process collaboration request - response = await self._process_collaboration(message) + response_content = await self._process_collaboration(message) - # Send response back + # Create response message response_message = AgentMessage( sender=self.name, recipient=other_agent.name, - content=response, - message_type="response" + content=response_content, + message_type=MessageType.INFORMATION_SHARING, + priority=message.priority ) other_agent.memory.add_message(response_message) + self.performance_tracker.record_message() return response_message async def _process_collaboration(self, message: AgentMessage) -> str: """Process collaboration message - to be implemented by subclasses.""" - return f"Acknowledged message from {message.sender}" + return f"{self.name} acknowledges collaboration request from {message.sender}" + + def get_status_summary(self) -> Dict[str, Any]: + """Get current status summary of the agent.""" + return { + 'name': self.name, + 'role': self.role.value, + 'active': self.active, + 'current_task': self.current_task.description if self.current_task else None, + 'completed_tasks': len(self.completed_tasks), + 'average_confidence': np.mean([t.confidence for t in self.completed_tasks]) if self.completed_tasks else 0, + 'collaboration_count': len(self.collaboration_partners), + 'memory_size': len(self.memory.messages) + } class ResearcherAgent(BaseAgent): """Agent specialized in researching and gathering information.""" - def __init__(self, name: str, llm: ChatOpenAI): + def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): super().__init__(name, AgentRole.RESEARCHER, llm) self.research_sources: List[str] = [] + self.research_methods = ["literature_review", "data_collection", "expert_consultation", "field_research"] async def _execute_task(self, task: Task) -> Any: """Execute research task.""" @@ -232,68 +465,119 @@ class ResearcherAgent(BaseAgent): 1. Break down complex topics into research questions 2. Identify key information sources and data points 3. Provide detailed, factual information with citations where possible - 4. Flag areas requiring further investigation"""), + 4. Flag areas requiring further investigation + 5. Maintain objectivity and consider multiple perspectives"""), HumanMessage(content=f"Research the following: {task.description}") ]) response = await self.llm.ainvoke(prompt.format_messages()) - # Extract research findings + # Extract and structure research findings research_result = { "findings": response.content, "sources": self._extract_sources(response.content), "key_points": self._extract_key_points(response.content), - "areas_for_investigation": self._identify_gaps(response.content) + "areas_for_investigation": self._identify_gaps(response.content), + "research_quality_score": self._assess_research_quality(response.content) } + # Update internal knowledge + self.memory.update_context('latest_research', research_result) + return research_result def _extract_sources(self, content: str) -> List[str]: """Extract potential sources from research content.""" - # Simplified source extraction sources = [] + source_indicators = ['source:', 'reference:', 'based on:', 'according to', 'study:', 'report:'] + lines = content.split('\n') for line in lines: - if any(keyword in line.lower() for keyword in ['source:', 'reference:', 'based on:', 'according to']): - sources.append(line.strip()) - return sources[:5] # Limit to top 5 sources + line_lower = line.lower() + for indicator in source_indicators: + if indicator in line_lower: + sources.append(line.strip()) + break + + return sources[:10] # Limit to top 10 sources def _extract_key_points(self, content: str) -> List[str]: """Extract key points from research.""" key_points = [] lines = content.split('\n') + for line in lines: - if line.strip() and (line.strip()[0].isdigit() or line.strip().startswith('-')): - key_points.append(line.strip()) - return key_points[:10] # Limit to top 10 points + line = line.strip() + # Check for numbered or bulleted points + if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')): + key_points.append(line) + # Check for key phrases + elif any(phrase in line.lower() for phrase in ['key finding:', 'important:', 'notably:']): + key_points.append(line) + + return key_points[:15] # Limit to top 15 points def _identify_gaps(self, content: str) -> List[str]: """Identify areas needing more research.""" gaps = [] - keywords = ['unclear', 'requires further', 'need more', 'investigate', 'unknown'] - lines = content.split('\n') - for line in lines: - if any(keyword in line.lower() for keyword in keywords): - gaps.append(line.strip()) + gap_indicators = ['unclear', 'requires further', 'need more', 'investigate', + 'unknown', 'limited data', 'insufficient evidence'] + + sentences = content.split('.') + for sentence in sentences: + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in gap_indicators): + gaps.append(sentence.strip() + '.') + return gaps[:5] + + def _assess_research_quality(self, content: str) -> float: + """Assess the quality of research output.""" + quality_score = 0.5 # Base score + + # Check for sources + if self._extract_sources(content): + quality_score += 0.15 + + # Check for structured content + if self._extract_key_points(content): + quality_score += 0.15 + + # Check for comprehensive coverage + word_count = len(content.split()) + if word_count > 300: + quality_score += 0.1 + + # Check for analytical depth + analytical_terms = ['analysis', 'evaluation', 'comparison', 'contrast', 'implication'] + if any(term in content.lower() for term in analytical_terms): + quality_score += 0.1 + + return min(1.0, quality_score) class AnalystAgent(BaseAgent): """Agent specialized in analyzing data and identifying patterns.""" - def __init__(self, name: str, llm: ChatOpenAI): + def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): super().__init__(name, AgentRole.ANALYST, llm) - self.analysis_methods: List[str] = ["statistical", "comparative", "trend", "causal"] + self.analysis_methods = ["statistical", "comparative", "trend", "causal", "predictive"] + self.analysis_frameworks = ["SWOT", "PESTLE", "Porter's Five Forces", "Cost-Benefit"] async def _execute_task(self, task: Task) -> Any: """Execute analysis task.""" + # Get context from previous research if available + context = self._get_relevant_context(task) + prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are an Analyst Agent specializing in data analysis and pattern recognition. Your role is to: 1. Analyze information systematically and objectively 2. Identify patterns, trends, and correlations 3. Provide quantitative insights where possible - 4. Draw logical conclusions based on evidence"""), - HumanMessage(content=f"Analyze the following: {task.description}") + 4. Draw logical conclusions based on evidence + 5. Apply appropriate analytical frameworks + 6. Consider multiple analytical perspectives"""), + HumanMessage(content=f"Analyze the following: {task.description}\n\nContext: {context}") ]) response = await self.llm.ainvoke(prompt.format_messages()) @@ -304,88 +588,158 @@ class AnalystAgent(BaseAgent): "patterns": self._identify_patterns(response.content), "insights": self._extract_insights(response.content), "recommendations": self._generate_recommendations(response.content), - "confidence_metrics": self._calculate_analysis_confidence(response.content) + "confidence_metrics": self._calculate_analysis_confidence(response.content), + "analytical_framework": self._identify_framework_used(response.content) } + # Store analysis in memory + self.memory.update_context('latest_analysis', analysis_result) + return analysis_result + def _get_relevant_context(self, task: Task) -> str: + """Get relevant context from memory for the task.""" + context_items = [] + + # Get recent messages related to the task + recent_messages = self.memory.get_recent_messages(5) + for msg in recent_messages: + if task.description.lower() in msg.content.lower(): + context_items.append(f"Previous finding: {msg.content[:200]}...") + + # Get knowledge base items + for key, knowledge in self.memory.knowledge_base.items(): + if 'finding' in knowledge['type'] or 'insight' in knowledge['type']: + context_items.append(f"Known insight: {knowledge['content'][:200]}...") + + return "\n".join(context_items[:3]) # Limit context items + def _identify_patterns(self, content: str) -> List[Dict[str, str]]: """Identify patterns in the analysis.""" patterns = [] - pattern_keywords = ['pattern', 'trend', 'correlation', 'relationship', 'consistent'] + pattern_types = { + 'trend': ['trend', 'increasing', 'decreasing', 'growth', 'decline'], + 'correlation': ['correlation', 'relationship', 'associated', 'linked'], + 'cyclical': ['cycle', 'periodic', 'seasonal', 'recurring'], + 'anomaly': ['anomaly', 'outlier', 'unusual', 'exceptional'] + } - lines = content.split('\n') - for line in lines: - if any(keyword in line.lower() for keyword in pattern_keywords): - patterns.append({ - "description": line.strip(), - "type": self._classify_pattern(line) - }) + sentences = content.split('.') + for sentence in sentences: + sentence_lower = sentence.lower() + for pattern_type, keywords in pattern_types.items(): + if any(keyword in sentence_lower for keyword in keywords): + patterns.append({ + "description": sentence.strip() + '.', + "type": pattern_type, + "confidence": 0.8 + }) + break - return patterns[:5] - - def _classify_pattern(self, description: str) -> str: - """Classify the type of pattern.""" - description_lower = description.lower() - if 'trend' in description_lower: - return 'trend' - elif 'correlation' in description_lower: - return 'correlation' - elif 'cycle' in description_lower or 'periodic' in description_lower: - return 'cyclical' - else: - return 'general' + return patterns[:8] def _extract_insights(self, content: str) -> List[str]: """Extract key insights from analysis.""" insights = [] - insight_keywords = ['shows', 'indicates', 'suggests', 'reveals', 'demonstrates'] + insight_indicators = ['shows', 'indicates', 'suggests', 'reveals', + 'demonstrates', 'implies', 'means that', 'therefore'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in insight_keywords): + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in insight_indicators): insights.append(sentence.strip() + '.') - return insights[:7] + return insights[:10] def _generate_recommendations(self, content: str) -> List[str]: """Generate recommendations based on analysis.""" recommendations = [] - rec_keywords = ['recommend', 'suggest', 'should', 'consider', 'advise'] + rec_indicators = ['recommend', 'suggest', 'should', 'consider', + 'advise', 'propose', 'it would be beneficial'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in rec_keywords): + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in rec_indicators): recommendations.append(sentence.strip() + '.') - return recommendations[:5] + # Prioritize recommendations + prioritized = [] + for rec in recommendations: + priority = "high" if any(word in rec.lower() for word in ['critical', 'essential', 'must']) else "medium" + prioritized.append({"recommendation": rec, "priority": priority}) + + return prioritized[:7] def _calculate_analysis_confidence(self, content: str) -> Dict[str, float]: """Calculate confidence metrics for the analysis.""" - word_count = len(content.split()) - evidence_count = content.lower().count('evidence') + content.lower().count('data') + content.lower().count('shows') - uncertainty_count = content.lower().count('may') + content.lower().count('might') + content.lower().count('possibly') + # Count evidence indicators + evidence_count = sum(content.lower().count(word) for word in ['evidence', 'data', 'shows', 'proves']) + uncertainty_count = sum(content.lower().count(word) for word in ['may', 'might', 'possibly', 'perhaps']) + + # Calculate confidence scores + evidence_strength = min(1.0, evidence_count / 10) + certainty_level = max(0.0, 1.0 - (uncertainty_count / 10)) - confidence = min(0.95, 0.5 + (evidence_count * 0.1) - (uncertainty_count * 0.05) + (word_count / 1000)) + # Check for quantitative analysis + quantitative_indicators = ['percentage', '%', 'ratio', 'correlation', 'statistical'] + quantitative_score = 0.7 if any(ind in content.lower() for ind in quantitative_indicators) else 0.5 + + overall_confidence = (evidence_strength + certainty_level + quantitative_score) / 3 return { - "overall_confidence": confidence, - "evidence_strength": min(1.0, evidence_count / 10), - "certainty_level": max(0.0, 1.0 - (uncertainty_count / 10)) + "overall_confidence": overall_confidence, + "evidence_strength": evidence_strength, + "certainty_level": certainty_level, + "quantitative_score": quantitative_score } + + def _identify_framework_used(self, content: str) -> Optional[str]: + """Identify which analytical framework was used.""" + content_lower = content.lower() + + for framework in self.analysis_frameworks: + if framework.lower() in content_lower: + return framework + + # Check for implicit framework usage + if all(word in content_lower for word in ['strength', 'weakness', 'opportunity', 'threat']): + return "SWOT" + elif any(word in content_lower for word in ['political', 'economic', 'social', 'technological']): + return "PESTLE" + + return None class CriticAgent(BaseAgent): """Agent specialized in critical evaluation and quality assurance.""" - def __init__(self, name: str, llm: ChatOpenAI): + def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): super().__init__(name, AgentRole.CRITIC, llm) self.evaluation_criteria = [ "accuracy", "completeness", "logic", "evidence", - "clarity", "relevance", "consistency" + "clarity", "relevance", "consistency", "objectivity" ] + self.evaluation_rubric = self._create_evaluation_rubric() + + def _create_evaluation_rubric(self) -> Dict[str, Dict[str, float]]: + """Create evaluation rubric with weighted criteria.""" + return { + "accuracy": {"weight": 0.20, "score": 0.0}, + "completeness": {"weight": 0.15, "score": 0.0}, + "logic": {"weight": 0.15, "score": 0.0}, + "evidence": {"weight": 0.15, "score": 0.0}, + "clarity": {"weight": 0.10, "score": 0.0}, + "relevance": {"weight": 0.10, "score": 0.0}, + "consistency": {"weight": 0.10, "score": 0.0}, + "objectivity": {"weight": 0.05, "score": 0.0} + } async def _execute_task(self, task: Task) -> Any: """Execute critical evaluation task.""" + # Get content to evaluate from context + evaluation_context = self._gather_evaluation_context(task) + prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are a Critic Agent specializing in rigorous evaluation and quality assurance. Your role is to: @@ -393,8 +747,10 @@ class CriticAgent(BaseAgent): 2. Identify weaknesses, gaps, and potential biases 3. Verify logical consistency and evidence quality 4. Suggest improvements and alternative perspectives - 5. Ensure high standards of analysis"""), - HumanMessage(content=f"Critically evaluate the following: {task.description}") + 5. Ensure high standards of analysis + 6. Apply systematic evaluation criteria + 7. Provide constructive feedback"""), + HumanMessage(content=f"Critically evaluate the following: {task.description}\n\nContent to evaluate: {evaluation_context}") ]) response = await self.llm.ainvoke(prompt.format_messages()) @@ -406,84 +762,260 @@ class CriticAgent(BaseAgent): "weaknesses": self._identify_weaknesses(response.content), "gaps": self._identify_gaps(response.content), "improvements": self._suggest_improvements(response.content), - "quality_score": self._calculate_quality_score(response.content) + "quality_score": self._calculate_quality_score(response.content), + "alternative_perspectives": self._identify_alternatives(response.content), + "final_verdict": self._generate_verdict(response.content) } + # Update evaluation history + self.memory.update_context('evaluation_history', critique_result) + return critique_result - def _identify_strengths(self, content: str) -> List[str]: + def _gather_evaluation_context(self, task: Task) -> str: + """Gather relevant context for evaluation.""" + context_items = [] + + # Get recent analysis and research results + recent_messages = self.memory.get_recent_messages(10) + for msg in recent_messages: + if msg.message_type in [MessageType.COMPLETION_REPORT, MessageType.INFORMATION_SHARING]: + context_items.append(f"{msg.sender}: {msg.content[:300]}...") + + # Get knowledge base insights + for key, knowledge in self.memory.knowledge_base.items(): + if knowledge['type'] in ['finding', 'conclusion', 'insight']: + context_items.append(f"Previous {knowledge['type']}: {knowledge['content'][:200]}...") + + return "\n\n".join(context_items[:5]) + + def _identify_strengths(self, content: str) -> List[Dict[str, str]]: """Identify strengths in the evaluated content.""" strengths = [] - strength_keywords = ['strong', 'excellent', 'well', 'good', 'effective', 'solid'] + strength_indicators = ['strong', 'excellent', 'well', 'good', 'effective', + 'solid', 'robust', 'comprehensive', 'thorough'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in strength_keywords): - strengths.append(sentence.strip() + '.') + sentence_lower = sentence.lower() + for indicator in strength_indicators: + if indicator in sentence_lower: + strengths.append({ + "strength": sentence.strip() + '.', + "category": self._categorize_strength(sentence), + "impact": "high" if any(word in sentence_lower for word in ['very', 'extremely', 'highly']) else "medium" + }) + break + + return strengths[:6] + + def _categorize_strength(self, sentence: str) -> str: + """Categorize the type of strength identified.""" + sentence_lower = sentence.lower() - return strengths[:5] + if any(word in sentence_lower for word in ['method', 'approach', 'framework']): + return "methodology" + elif any(word in sentence_lower for word in ['data', 'evidence', 'support']): + return "evidence" + elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']): + return "reasoning" + elif any(word in sentence_lower for word in ['clear', 'organized', 'structured']): + return "presentation" + else: + return "general" - def _identify_weaknesses(self, content: str) -> List[str]: + def _identify_weaknesses(self, content: str) -> List[Dict[str, str]]: """Identify weaknesses in the evaluated content.""" weaknesses = [] - weakness_keywords = ['weak', 'lack', 'insufficient', 'poor', 'inadequate', 'missing'] + weakness_indicators = ['weak', 'lack', 'insufficient', 'poor', 'inadequate', + 'missing', 'limited', 'unclear', 'vague'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in weakness_keywords): - weaknesses.append(sentence.strip() + '.') + sentence_lower = sentence.lower() + for indicator in weakness_indicators: + if indicator in sentence_lower: + weaknesses.append({ + "weakness": sentence.strip() + '.', + "severity": self._assess_severity(sentence), + "category": self._categorize_weakness(sentence) + }) + break + + return weaknesses[:6] + + def _assess_severity(self, sentence: str) -> str: + """Assess the severity of a weakness.""" + sentence_lower = sentence.lower() - return weaknesses[:5] + if any(word in sentence_lower for word in ['critical', 'severe', 'major', 'significant']): + return "high" + elif any(word in sentence_lower for word in ['moderate', 'some', 'partial']): + return "medium" + else: + return "low" + + def _categorize_weakness(self, sentence: str) -> str: + """Categorize the type of weakness identified.""" + sentence_lower = sentence.lower() + + if any(word in sentence_lower for word in ['data', 'evidence', 'support']): + return "evidence" + elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']): + return "reasoning" + elif any(word in sentence_lower for word in ['bias', 'objective', 'neutral']): + return "objectivity" + elif any(word in sentence_lower for word in ['complete', 'comprehensive', 'thorough']): + return "completeness" + else: + return "general" def _identify_gaps(self, content: str) -> List[str]: """Identify gaps in the analysis.""" gaps = [] - gap_keywords = ['gap', 'missing', 'overlook', 'fail to', 'does not address', 'ignores'] + gap_indicators = ['gap', 'missing', 'overlook', 'fail to', 'does not address', + 'ignores', 'omits', 'neglects'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in gap_keywords): + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in gap_indicators): gaps.append(sentence.strip() + '.') return gaps[:5] - def _suggest_improvements(self, content: str) -> List[str]: + def _suggest_improvements(self, content: str) -> List[Dict[str, str]]: """Suggest improvements based on critique.""" improvements = [] - improvement_keywords = ['could', 'should', 'improve', 'enhance', 'strengthen', 'add'] + improvement_indicators = ['could', 'should', 'improve', 'enhance', + 'strengthen', 'add', 'consider', 'recommend'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in improvement_keywords): - improvements.append(sentence.strip() + '.') + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in improvement_indicators): + improvements.append({ + "suggestion": sentence.strip() + '.', + "priority": self._prioritize_improvement(sentence), + "effort": self._estimate_effort(sentence) + }) + + return improvements[:7] + + def _prioritize_improvement(self, sentence: str) -> str: + """Prioritize improvement suggestions.""" + sentence_lower = sentence.lower() - return improvements[:5] + if any(word in sentence_lower for word in ['critical', 'essential', 'must', 'urgent']): + return "high" + elif any(word in sentence_lower for word in ['should', 'important', 'recommend']): + return "medium" + else: + return "low" + + def _estimate_effort(self, sentence: str) -> str: + """Estimate effort required for improvement.""" + sentence_lower = sentence.lower() + + if any(word in sentence_lower for word in ['simple', 'easy', 'quick', 'minor']): + return "low" + elif any(word in sentence_lower for word in ['moderate', 'some', 'reasonable']): + return "medium" + elif any(word in sentence_lower for word in ['significant', 'substantial', 'major']): + return "high" + else: + return "medium" def _calculate_quality_score(self, content: str) -> Dict[str, float]: - """Calculate quality scores for different criteria.""" - scores = {} + """Calculate detailed quality scores.""" + scores = self.evaluation_rubric.copy() + content_lower = content.lower() + # Score each criterion based on content analysis for criterion in self.evaluation_criteria: - # Simplified scoring based on keyword presence - positive_count = content.lower().count(criterion) + content.lower().count('good') + content.lower().count('strong') - negative_count = content.lower().count('poor') + content.lower().count('weak') + content.lower().count('lacking') + score = 0.5 # Base score + + # Positive indicators + if criterion in content_lower and any(word in content_lower for word in ['good', 'strong', 'excellent']): + score += 0.3 + + # Negative indicators + if criterion in content_lower and any(word in content_lower for word in ['poor', 'weak', 'lacking']): + score -= 0.3 - score = min(1.0, max(0.0, 0.5 + (positive_count * 0.1) - (negative_count * 0.15))) - scores[criterion] = score + scores[criterion]["score"] = max(0.0, min(1.0, score)) + + # Calculate overall score + overall = sum(scores[c]["score"] * scores[c]["weight"] for c in scores) + + return { + "overall": overall, + "breakdown": {c: scores[c]["score"] for c in scores}, + "grade": self._convert_to_grade(overall) + } + + def _convert_to_grade(self, score: float) -> str: + """Convert numeric score to letter grade.""" + if score >= 0.9: + return "A" + elif score >= 0.8: + return "B" + elif score >= 0.7: + return "C" + elif score >= 0.6: + return "D" + else: + return "F" + + def _identify_alternatives(self, content: str) -> List[str]: + """Identify alternative perspectives mentioned.""" + alternatives = [] + alternative_indicators = ['alternatively', 'another perspective', 'different approach', + 'could also', 'different view', 'alternative'] - scores['overall'] = sum(scores.values()) / len(scores) + sentences = content.split('.') + for sentence in sentences: + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in alternative_indicators): + alternatives.append(sentence.strip() + '.') + + return alternatives[:4] + + def _generate_verdict(self, content: str) -> Dict[str, str]: + """Generate final verdict based on evaluation.""" + # Simple verdict generation based on content sentiment + positive_count = sum(content.lower().count(word) for word in ['good', 'strong', 'excellent', 'effective']) + negative_count = sum(content.lower().count(word) for word in ['poor', 'weak', 'lacking', 'insufficient']) + + if positive_count > negative_count * 2: + verdict = "Approved with minor revisions" + confidence = "high" + elif positive_count > negative_count: + verdict = "Approved with moderate revisions" + confidence = "medium" + else: + verdict = "Requires significant improvements" + confidence = "medium" - return scores + return { + "verdict": verdict, + "confidence": confidence, + "summary": "Based on comprehensive evaluation across multiple criteria." + } class SynthesizerAgent(BaseAgent): """Agent specialized in synthesizing information and creating coherent narratives.""" - def __init__(self, name: str, llm: ChatOpenAI): + def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): super().__init__(name, AgentRole.SYNTHESIZER, llm) - self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate"] + self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate", "harmonize"] + self.output_formats = ["executive_summary", "detailed_report", "action_plan", "strategic_recommendation"] async def _execute_task(self, task: Task) -> Any: """Execute synthesis task.""" + # Gather all relevant information from previous agents + synthesis_input = self._gather_synthesis_input(task) + prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are a Synthesizer Agent specializing in integrating diverse information. Your role is to: @@ -491,8 +1023,10 @@ class SynthesizerAgent(BaseAgent): 2. Resolve contradictions and find common ground 3. Create comprehensive summaries that capture key insights 4. Generate actionable conclusions and recommendations - 5. Ensure clarity and accessibility of complex information"""), - HumanMessage(content=f"Synthesize the following information: {task.description}") + 5. Ensure clarity and accessibility of complex information + 6. Prioritize information based on relevance and impact + 7. Create structured outputs suitable for decision-making"""), + HumanMessage(content=f"Synthesize the following information: {task.description}\n\nInput data: {synthesis_input}") ]) response = await self.llm.ainvoke(prompt.format_messages()) @@ -504,116 +1038,452 @@ class SynthesizerAgent(BaseAgent): "consensus_points": self._identify_consensus(response.content), "contradictions": self._identify_contradictions(response.content), "final_recommendations": self._generate_final_recommendations(response.content), - "executive_summary": self._create_executive_summary(response.content) + "executive_summary": self._create_executive_summary(response.content), + "action_items": self._extract_action_items(response.content), + "confidence_level": self._assess_synthesis_confidence(response.content) } + # Store synthesis for future reference + self.memory.update_context('latest_synthesis', synthesis_result) + return synthesis_result - def _extract_themes(self, content: str) -> List[Dict[str, str]]: + def _gather_synthesis_input(self, task: Task) -> str: + """Gather all relevant information for synthesis.""" + input_sections = [] + + # Get findings from all agents + agent_findings = {} + for msg in self.memory.get_recent_messages(20): + if msg.sender not in agent_findings: + agent_findings[msg.sender] = [] + agent_findings[msg.sender].append(msg.content[:500]) + + # Structure input by agent type + for agent, findings in agent_findings.items(): + if findings: + input_sections.append(f"\n{agent} Contributions:\n" + "\n".join(findings[:3])) + + # Add knowledge base insights + knowledge_items = [] + for key, knowledge in self.memory.knowledge_base.items(): + knowledge_items.append(f"{knowledge['type'].title()}: {knowledge['content'][:200]}...") + + if knowledge_items: + input_sections.append("\nKnowledge Base:\n" + "\n".join(knowledge_items[:5])) + + return "\n".join(input_sections) + + def _extract_themes(self, content: str) -> List[Dict[str, Any]]: """Extract major themes from synthesis.""" themes = [] - theme_keywords = ['theme', 'pattern', 'trend', 'common', 'recurring', 'central'] + theme_indicators = ['theme', 'pattern', 'trend', 'common', 'recurring', + 'central', 'key finding', 'main point'] + # Split into paragraphs and analyze paragraphs = content.split('\n\n') - for i, paragraph in enumerate(paragraphs[:5]): # Limit to first 5 paragraphs - if any(keyword in paragraph.lower() for keyword in theme_keywords): + theme_count = 0 + + for paragraph in paragraphs: + paragraph_lower = paragraph.lower() + if any(indicator in paragraph_lower for indicator in theme_indicators): + theme_count += 1 themes.append({ - "theme": f"Theme {i+1}", - "description": paragraph.strip()[:200] + "..." if len(paragraph) > 200 else paragraph.strip() + "theme": f"Theme {theme_count}", + "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(), + "importance": self._assess_theme_importance(paragraph), + "support_level": self._assess_support_level(paragraph) }) - return themes + # If no explicit themes found, extract from content structure + if not themes and paragraphs: + for i, paragraph in enumerate(paragraphs[:5]): + if len(paragraph.strip()) > 50: + themes.append({ + "theme": f"Finding {i+1}", + "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(), + "importance": "medium", + "support_level": "moderate" + }) + + return themes[:6] + + def _assess_theme_importance(self, content: str) -> str: + """Assess the importance of a theme.""" + content_lower = content.lower() + + high_importance_indicators = ['critical', 'essential', 'fundamental', 'crucial', 'vital'] + if any(indicator in content_lower for indicator in high_importance_indicators): + return "high" + + low_importance_indicators = ['minor', 'secondary', 'marginal', 'peripheral'] + if any(indicator in content_lower for indicator in low_importance_indicators): + return "low" + + return "medium" + + def _assess_support_level(self, content: str) -> str: + """Assess the level of support for a theme.""" + content_lower = content.lower() + + strong_support = ['consensus', 'unanimous', 'clear evidence', 'strongly supported'] + if any(indicator in content_lower for indicator in strong_support): + return "strong" + + weak_support = ['limited evidence', 'some indication', 'preliminary', 'tentative'] + if any(indicator in content_lower for indicator in weak_support): + return "weak" + + return "moderate" - def _identify_consensus(self, content: str) -> List[str]: + def _identify_consensus(self, content: str) -> List[Dict[str, str]]: """Identify points of consensus.""" - consensus = [] - consensus_keywords = ['agree', 'consensus', 'common', 'shared', 'unanimous', 'consistent'] + consensus_points = [] + consensus_indicators = ['agree', 'consensus', 'common', 'shared', 'unanimous', + 'consistent', 'alignment', 'convergence'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in consensus_keywords): - consensus.append(sentence.strip() + '.') + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in consensus_indicators): + consensus_points.append({ + "point": sentence.strip() + '.', + "strength": "strong" if "unanimous" in sentence_lower or "clear consensus" in sentence_lower else "moderate" + }) - return consensus[:5] + return consensus_points[:6] - def _identify_contradictions(self, content: str) -> List[str]: + def _identify_contradictions(self, content: str) -> List[Dict[str, str]]: """Identify contradictions or conflicts.""" contradictions = [] - conflict_keywords = ['however', 'contrary', 'conflict', 'disagree', 'opposing', 'contradicts'] + conflict_indicators = ['however', 'contrary', 'conflict', 'disagree', 'opposing', + 'contradicts', 'tension', 'divergent', 'inconsistent'] sentences = content.split('.') for sentence in sentences: - if any(keyword in sentence.lower() for keyword in conflict_keywords): - contradictions.append(sentence.strip() + '.') + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in conflict_indicators): + contradictions.append({ + "contradiction": sentence.strip() + '.', + "resolution_suggested": self._check_for_resolution(sentence), + "impact": self._assess_contradiction_impact(sentence) + }) - return contradictions[:3] + return contradictions[:4] + + def _check_for_resolution(self, sentence: str) -> bool: + """Check if a resolution is suggested for the contradiction.""" + resolution_indicators = ['can be resolved', 'reconcile', 'bridge', 'common ground', 'compromise'] + return any(indicator in sentence.lower() for indicator in resolution_indicators) + + def _assess_contradiction_impact(self, sentence: str) -> str: + """Assess the impact of a contradiction.""" + sentence_lower = sentence.lower() + + if any(word in sentence_lower for word in ['fundamental', 'major', 'significant']): + return "high" + elif any(word in sentence_lower for word in ['minor', 'small', 'slight']): + return "low" + else: + return "medium" - def _generate_final_recommendations(self, content: str) -> List[str]: + def _generate_final_recommendations(self, content: str) -> List[Dict[str, Any]]: """Generate final synthesized recommendations.""" recommendations = [] - # Look for recommendation sections - lines = content.split('\n') - in_recommendations = False + # Extract recommendation sentences + rec_indicators = ['recommend', 'suggest', 'propose', 'advise', 'should', 'must'] + sentences = content.split('.') - for line in lines: - if 'recommend' in line.lower() or 'conclusion' in line.lower(): - in_recommendations = True - elif in_recommendations and line.strip(): - recommendations.append(line.strip()) - if len(recommendations) >= 5: - break + for sentence in sentences: + sentence_lower = sentence.lower() + if any(indicator in sentence_lower for indicator in rec_indicators): + recommendations.append({ + "recommendation": sentence.strip() + '.', + "priority": self._determine_priority(sentence), + "timeframe": self._determine_timeframe(sentence), + "category": self._categorize_recommendation(sentence) + }) + + # Sort by priority + priority_order = {"high": 0, "medium": 1, "low": 2} + recommendations.sort(key=lambda x: priority_order.get(x["priority"], 3)) + + return recommendations[:8] + + def _determine_priority(self, sentence: str) -> str: + """Determine recommendation priority.""" + sentence_lower = sentence.lower() - return recommendations + if any(word in sentence_lower for word in ['urgent', 'immediate', 'critical', 'must']): + return "high" + elif any(word in sentence_lower for word in ['should', 'important', 'recommend']): + return "medium" + else: + return "low" + + def _determine_timeframe(self, sentence: str) -> str: + """Determine recommendation timeframe.""" + sentence_lower = sentence.lower() + + if any(word in sentence_lower for word in ['immediate', 'now', 'urgent', 'asap']): + return "immediate" + elif any(word in sentence_lower for word in ['short-term', 'soon', 'near']): + return "short-term" + elif any(word in sentence_lower for word in ['long-term', 'future', 'eventually']): + return "long-term" + else: + return "medium-term" + + def _categorize_recommendation(self, sentence: str) -> str: + """Categorize the type of recommendation.""" + sentence_lower = sentence.lower() + + if any(word in sentence_lower for word in ['strategy', 'strategic', 'plan']): + return "strategic" + elif any(word in sentence_lower for word in ['operational', 'process', 'procedure']): + return "operational" + elif any(word in sentence_lower for word in ['tactical', 'action', 'implement']): + return "tactical" + else: + return "general" def _create_executive_summary(self, content: str) -> str: """Create an executive summary of the synthesis.""" - # Take first paragraph or create summary + # Extract key sentences for summary + summary_parts = [] + + # Get opening statement paragraphs = content.split('\n\n') if paragraphs: - summary = paragraphs[0][:300] - if len(paragraphs[0]) > 300: - summary += "..." - return summary - return "Summary generation in progress..." + opening = paragraphs[0][:200] + if len(paragraphs[0]) > 200: + opening += "..." + summary_parts.append(opening) + + # Extract key findings + key_finding_indicators = ['key finding', 'main conclusion', 'importantly', 'notably'] + for paragraph in paragraphs[1:]: + if any(indicator in paragraph.lower() for indicator in key_finding_indicators): + summary_parts.append(paragraph[:150] + "..." if len(paragraph) > 150 else paragraph) + if len(summary_parts) >= 3: + break + + # Add conclusion if present + if len(paragraphs) > 1: + conclusion = paragraphs[-1][:150] + if conclusion not in summary_parts: + summary_parts.append(conclusion + "..." if len(paragraphs[-1]) > 150 else conclusion) + + return " ".join(summary_parts) + + def _extract_action_items(self, content: str) -> List[Dict[str, str]]: + """Extract specific action items from synthesis.""" + action_items = [] + action_indicators = ['action:', 'task:', 'todo:', 'action item', 'next step', 'to do'] + + lines = content.split('\n') + for line in lines: + line_lower = line.lower() + if any(indicator in line_lower for indicator in action_indicators): + action_items.append({ + "action": line.strip(), + "owner": "TBD", + "deadline": "TBD", + "status": "pending" + }) + # Also check for numbered action items + elif line.strip() and line.strip()[0].isdigit() and 'action' in line_lower: + action_items.append({ + "action": line.strip(), + "owner": "TBD", + "deadline": "TBD", + "status": "pending" + }) + + return action_items[:10] + + def _assess_synthesis_confidence(self, content: str) -> Dict[str, Any]: + """Assess confidence in the synthesis.""" + # Calculate various confidence indicators + word_count = len(content.split()) + + # Check for confidence language + high_confidence_words = ['clear', 'strong', 'definitive', 'conclusive', 'certain'] + low_confidence_words = ['uncertain', 'unclear', 'tentative', 'preliminary', 'limited'] + + high_conf_count = sum(content.lower().count(word) for word in high_confidence_words) + low_conf_count = sum(content.lower().count(word) for word in low_confidence_words) + + # Calculate confidence score + base_confidence = 0.7 + confidence_adjustment = (high_conf_count * 0.05) - (low_conf_count * 0.08) + overall_confidence = max(0.3, min(0.95, base_confidence + confidence_adjustment)) + + return { + "overall": overall_confidence, + "level": "high" if overall_confidence > 0.8 else "medium" if overall_confidence > 0.6 else "low", + "factors": { + "content_depth": word_count > 500, + "evidence_strength": high_conf_count > low_conf_count, + "consensus_level": "consensus" in content.lower() + } + } class CoordinatorAgent(BaseAgent): """Agent responsible for coordinating other agents and managing workflow.""" - def __init__(self, name: str, llm: ChatOpenAI): + def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): super().__init__(name, AgentRole.COORDINATOR, llm) self.agents: Dict[str, BaseAgent] = {} self.task_queue: List[Task] = [] self.completed_tasks: List[Task] = [] self.workflow_graph = nx.DiGraph() self.execution_history: List[Dict[str, Any]] = [] + self.workflow_templates = self._create_workflow_templates() + + def _create_workflow_templates(self) -> Dict[str, List[Dict[str, Any]]]: + """Create predefined workflow templates for common problem types.""" + return { + "research_analysis": [ + {"role": "Researcher", "task": "Gather comprehensive information"}, + {"role": "Analyst", "task": "Analyze findings and identify patterns"}, + {"role": "Critic", "task": "Evaluate analysis quality"}, + {"role": "Synthesizer", "task": "Create final recommendations"} + ], + "strategic_planning": [ + {"role": "Researcher", "task": "Research current state and trends"}, + {"role": "Analyst", "task": "SWOT analysis and opportunity identification"}, + {"role": "Researcher", "task": "Benchmark best practices"}, + {"role": "Critic", "task": "Risk assessment and gap analysis"}, + {"role": "Synthesizer", "task": "Strategic plan synthesis"} + ], + "problem_solving": [ + {"role": "Researcher", "task": "Define problem and gather context"}, + {"role": "Analyst", "task": "Root cause analysis"}, + {"role": "Researcher", "task": "Research potential solutions"}, + {"role": "Critic", "task": "Evaluate solution feasibility"}, + {"role": "Synthesizer", "task": "Recommend optimal solution"} + ] + } def register_agent(self, agent: BaseAgent): """Register an agent with the coordinator.""" self.agents[agent.name] = agent self.workflow_graph.add_node(agent.name, role=agent.role.value) + logger.info(f"Registered agent: {agent.name} with role {agent.role.value}") - async def decompose_problem(self, problem: str) -> List[Task]: + async def decompose_problem(self, problem: str, use_template: bool = False) -> List[Task]: """Decompose a complex problem into subtasks.""" - prompt = ChatPromptTemplate.from_messages([ - SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems. - Decompose the problem into specific subtasks that can be assigned to specialized agents: - - Researcher: For gathering information and facts - - Analyst: For analyzing data and identifying patterns - - Critic: For evaluating quality and identifying issues - - Synthesizer: For combining insights and creating summaries + if use_template: + # Try to match problem to a template + template_tasks = self._match_problem_to_template(problem) + if template_tasks: + return template_tasks + + if self.llm: + # Use LLM to decompose problem + prompt = ChatPromptTemplate.from_messages([ + SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems. + Decompose the problem into specific subtasks that can be assigned to specialized agents: + - Researcher: For gathering information and facts + - Analyst: For analyzing data and identifying patterns + - Critic: For evaluating quality and identifying issues + - Synthesizer: For combining insights and creating summaries + + Create 4-8 clear, actionable subtasks with dependencies. + Format each task as: [Role]: [Specific task description]"""), + HumanMessage(content=f"Decompose this problem into subtasks: {problem}") + ]) - Create clear, actionable subtasks with dependencies."""), - HumanMessage(content=f"Decompose this problem into subtasks: {problem}") - ]) + response = await self.llm.ainvoke(prompt.format_messages()) + tasks = self._parse_tasks(response.content, problem) + else: + # Demo mode - use template or default decomposition + tasks = self._create_default_tasks(problem) - response = await self.llm.ainvoke(prompt.format_messages()) + # Enhance tasks with metadata + for i, task in enumerate(tasks): + task.metadata['problem_complexity'] = self._assess_problem_complexity(problem) + task.metadata['estimated_duration'] = self._estimate_task_duration(task) + + return tasks + + def _match_problem_to_template(self, problem: str) -> Optional[List[Task]]: + """Match problem to a workflow template.""" + problem_lower = problem.lower() + + # Check for template matches + if any(word in problem_lower for word in ['strategy', 'strategic', 'plan']): + template_name = "strategic_planning" + elif any(word in problem_lower for word in ['research', 'analyze', 'investigate']): + template_name = "research_analysis" + elif any(word in problem_lower for word in ['problem', 'solve', 'solution', 'fix']): + template_name = "problem_solving" + else: + return None + + # Create tasks from template + template = self.workflow_templates[template_name] + tasks = [] - # Parse response into tasks - tasks = self._parse_tasks(response.content, problem) + for i, step in enumerate(template): + task = Task( + id=f"task_{i+1}", + description=f"{step['task']} for: {problem}", + metadata={ + "original_problem": problem, + "suggested_role": step['role'], + "template": template_name + } + ) + tasks.append(task) return tasks + def _assess_problem_complexity(self, problem: str) -> str: + """Assess the complexity of a problem.""" + # Simple heuristic based on problem characteristics + complexity_indicators = { + "high": ['multiple', 'complex', 'comprehensive', 'strategic', 'long-term'], + "medium": ['analyze', 'evaluate', 'develop', 'assess'], + "low": ['simple', 'basic', 'straightforward', 'identify'] + } + + problem_lower = problem.lower() + + for level, indicators in complexity_indicators.items(): + if any(indicator in problem_lower for indicator in indicators): + return level + + # Default based on length + return "high" if len(problem) > 200 else "medium" + + def _estimate_task_duration(self, task: Task) -> float: + """Estimate task duration in seconds.""" + # Base estimation on task characteristics + base_duration = 30.0 + + # Adjust based on role + role_multipliers = { + "Researcher": 1.2, + "Analyst": 1.5, + "Critic": 1.0, + "Synthesizer": 1.3 + } + + role = task.metadata.get("suggested_role", "Researcher") + duration = base_duration * role_multipliers.get(role, 1.0) + + # Adjust based on complexity + complexity = task.metadata.get("problem_complexity", "medium") + if complexity == "high": + duration *= 1.5 + elif complexity == "low": + duration *= 0.7 + + return duration + def _parse_tasks(self, content: str, original_problem: str) -> List[Task]: """Parse LLM response into Task objects.""" tasks = [] @@ -626,60 +1496,116 @@ class CoordinatorAgent(BaseAgent): line = line.strip() if not line: continue - - # Check if line indicates a role + + # Check for role indicators + role_found = False for role in AgentRole: - if role.value in line: + if role.value in line or role.value.lower() in line.lower(): current_role = role.value + role_found = True break - # If line starts with number or dash, it's likely a task - if (line[0].isdigit() or line.startswith('-')) and current_role: - # Extract task description - task_desc = line.lstrip('0123456789.-').strip() - - task = Task( - id=f"task_{task_id}", - description=task_desc, - metadata={ - "original_problem": original_problem, - "suggested_role": current_role - } - ) - - tasks.append(task) - task_id += 1 + # Extract task if we have a role + if current_role and ':' in line: + # Extract task description after role mention + task_parts = line.split(':', 1) + if len(task_parts) > 1: + task_desc = task_parts[1].strip() + + task = Task( + id=f"task_{task_id}", + description=task_desc, + metadata={ + "original_problem": original_problem, + "suggested_role": current_role, + "source": "llm_decomposition" + } + ) + + tasks.append(task) + task_id += 1 - # If no tasks were parsed, create default tasks - if not tasks: - tasks = [ - Task(id="task_1", description=f"Research background information on: {original_problem}", - metadata={"suggested_role": "Researcher"}), - Task(id="task_2", description=f"Analyze key aspects of: {original_problem}", - metadata={"suggested_role": "Analyst"}), - Task(id="task_3", description="Critically evaluate the research and analysis", - metadata={"suggested_role": "Critic"}), - Task(id="task_4", description="Synthesize all findings into actionable insights", - metadata={"suggested_role": "Synthesizer"}) - ] + # Ensure we have at least some tasks + if len(tasks) < 4: + tasks.extend(self._create_default_tasks(original_problem)[len(tasks):]) return tasks + def _create_default_tasks(self, problem: str) -> List[Task]: + """Create default tasks for a problem.""" + return [ + Task( + id="task_1", + description=f"Research comprehensive background information on: {problem}", + metadata={"suggested_role": "Researcher", "source": "default"} + ), + Task( + id="task_2", + description=f"Analyze key factors and patterns related to: {problem}", + metadata={"suggested_role": "Analyst", "source": "default"} + ), + Task( + id="task_3", + description="Critically evaluate the research findings and analysis quality", + metadata={"suggested_role": "Critic", "source": "default"} + ), + Task( + id="task_4", + description="Synthesize all findings into actionable insights and recommendations", + metadata={"suggested_role": "Synthesizer", "source": "default"} + ) + ] + + def _build_dependency_graph(self, tasks: List[Task]): + """Build a dependency graph for tasks.""" + # Define role execution order + role_order = { + "Researcher": 1, + "Analyst": 2, + "Critic": 3, + "Synthesizer": 4 + } + + # Sort tasks by role order + sorted_tasks = sorted(tasks, + key=lambda t: role_order.get(t.metadata.get("suggested_role", "Researcher"), 5)) + + # Create dependencies based on order + for i in range(len(sorted_tasks) - 1): + current_task = sorted_tasks[i] + next_task = sorted_tasks[i + 1] + + # Only add dependency if next task has higher order role + current_order = role_order.get(current_task.metadata.get("suggested_role"), 0) + next_order = role_order.get(next_task.metadata.get("suggested_role"), 0) + + if next_order >= current_order: + next_task.dependencies.append(current_task.id) + async def execute_workflow(self, tasks: List[Task], parallel: bool = True) -> Dict[str, Any]: """Execute the workflow with given tasks.""" start_time = datetime.now() + self.performance_tracker.start_tracking() # Build task dependency graph self._build_dependency_graph(tasks) + # Update workflow graph + self._update_workflow_graph(tasks) + # Execute tasks - if parallel: - results = await self._execute_parallel(tasks) - else: - results = await self._execute_sequential(tasks) + try: + if parallel: + await self._execute_parallel(tasks) + else: + await self._execute_sequential(tasks) + except Exception as e: + logger.error(f"Workflow execution error: {str(e)}") # Compile final results end_time = datetime.now() + self.performance_tracker.end_tracking() + execution_time = (end_time - start_time).total_seconds() workflow_result = { @@ -688,6 +1614,7 @@ class CoordinatorAgent(BaseAgent): "success_rate": self._calculate_success_rate(tasks), "agent_contributions": self._compile_agent_contributions(tasks), "workflow_graph": self.workflow_graph, + "performance_metrics": self.performance_tracker.get_performance_summary(), "timestamp": datetime.now() } @@ -695,68 +1622,73 @@ class CoordinatorAgent(BaseAgent): return workflow_result - def _build_dependency_graph(self, tasks: List[Task]): - """Build a dependency graph for tasks.""" - # For simplicity, create a linear dependency chain based on suggested roles - role_order = ["Researcher", "Analyst", "Critic", "Synthesizer"] + def _update_workflow_graph(self, tasks: List[Task]): + """Update the workflow graph with task relationships.""" + # Add task nodes + for task in tasks: + self.workflow_graph.add_node( + task.id, + task_description=task.description[:50] + "...", + status=task.status.value + ) - # Group tasks by role - tasks_by_role = {} + # Add edges for dependencies for task in tasks: - role = task.metadata.get("suggested_role", "Researcher") - if role not in tasks_by_role: - tasks_by_role[role] = [] - tasks_by_role[role].append(task) - - # Create dependencies - for i in range(len(role_order) - 1): - current_role = role_order[i] - next_role = role_order[i + 1] - - if current_role in tasks_by_role and next_role in tasks_by_role: - for current_task in tasks_by_role[current_role]: - for next_task in tasks_by_role[next_role]: - next_task.dependencies.append(current_task.id) + for dep_id in task.dependencies: + self.workflow_graph.add_edge(dep_id, task.id) async def _execute_parallel(self, tasks: List[Task]) -> List[Task]: """Execute tasks in parallel where possible.""" completed = set() pending = tasks.copy() - while pending: - # Find tasks that can be executed (no pending dependencies) - ready_tasks = [ - task for task in pending - if all(dep in completed for dep in task.dependencies) - ] - - if not ready_tasks: - # Deadlock prevention - execute first pending task - ready_tasks = [pending[0]] - - # Execute ready tasks in parallel - task_futures = [] - for task in ready_tasks: - agent_name = self._select_agent_for_task(task) - if agent_name and agent_name in self.agents: - agent = self.agents[agent_name] - task_futures.append(agent.process_task(task)) - - # Update workflow graph - self.workflow_graph.add_edge( - self.name, agent_name, - task_id=task.id, - timestamp=datetime.now() - ) - - # Wait for tasks to complete - if task_futures: - completed_tasks = await asyncio.gather(*task_futures) + with ThreadPoolExecutor(max_workers=len(self.agents)) as executor: + while pending: + # Find tasks ready for execution + ready_tasks = [ + task for task in pending + if all(dep in completed for dep in task.dependencies) + ] + + if not ready_tasks: + # Handle potential deadlock + logger.warning("No ready tasks found, executing first pending task") + ready_tasks = [pending[0]] if pending else [] - for task in completed_tasks: - completed.add(task.id) - pending.remove(task) - self.completed_tasks.append(task) + if not ready_tasks: + break + + # Submit tasks for parallel execution + future_to_task = {} + for task in ready_tasks: + agent_name = self._select_agent_for_task(task) + if agent_name and agent_name in self.agents: + agent = self.agents[agent_name] + future = executor.submit(asyncio.run, agent.process_task(task)) + future_to_task[future] = (task, agent_name) + + # Wait for tasks to complete + for future in as_completed(future_to_task): + task, agent_name = future_to_task[future] + try: + completed_task = future.result() + completed.add(task.id) + pending.remove(task) + self.completed_tasks.append(completed_task) + + # Update workflow graph + self.workflow_graph.add_edge( + self.name, agent_name, + task_id=task.id, + timestamp=datetime.now().isoformat() + ) + + # Record collaboration + await self._facilitate_collaboration(completed_task, agent_name) + + except Exception as e: + logger.error(f"Task {task.id} failed: {str(e)}") + task.status = TaskStatus.FAILED return tasks @@ -766,16 +1698,19 @@ class CoordinatorAgent(BaseAgent): agent_name = self._select_agent_for_task(task) if agent_name and agent_name in self.agents: agent = self.agents[agent_name] - await agent.process_task(task) + completed_task = await agent.process_task(task) # Update workflow graph self.workflow_graph.add_edge( self.name, agent_name, task_id=task.id, - timestamp=datetime.now() + timestamp=datetime.now().isoformat() ) - self.completed_tasks.append(task) + self.completed_tasks.append(completed_task) + + # Facilitate collaboration + await self._facilitate_collaboration(completed_task, agent_name) return tasks @@ -786,13 +1721,62 @@ class CoordinatorAgent(BaseAgent): # Find agent with matching role for agent_name, agent in self.agents.items(): if agent.role.value == suggested_role: + # Check agent availability + if agent.active and agent.current_task is None: + return agent_name + + # Fallback: find any available agent with the role + for agent_name, agent in self.agents.items(): + if agent.role.value == suggested_role: + return agent_name + + # Last resort: return any available agent + for agent_name, agent in self.agents.items(): + if agent.active: return agent_name - # Fallback to first available agent - return list(self.agents.keys())[0] if self.agents else None + return None - def _calculate_success_rate(self, tasks: List[Task]) -> float: - """Calculate the success rate of task execution.""" + async def _facilitate_collaboration(self, task: Task, agent_name: str): + """Facilitate collaboration between agents after task completion.""" + if not task.result or task.status != TaskStatus.COMPLETED: + return + + # Create collaboration message + collab_message = AgentMessage( + sender=agent_name, + recipient="all", + content=f"Task completed: {task.description}\nKey findings: {str(task.result)[:500]}", + message_type=MessageType.COMPLETION_REPORT, + priority=3 + ) + + # Share with relevant agents + for other_agent_name, other_agent in self.agents.items(): + if other_agent_name != agent_name: + # Determine if collaboration is needed + if self._should_collaborate(task, other_agent): + await other_agent.memory.add_message(collab_message) + self.performance_tracker.record_collaboration() + + def _should_collaborate(self, task: Task, agent: BaseAgent) -> bool: + """Determine if an agent should receive collaboration message.""" + # Synthesizer should receive all completion reports + if agent.role == AgentRole.SYNTHESIZER: + return True + + # Critic should receive analysis and research results + if agent.role == AgentRole.CRITIC and task.metadata.get("suggested_role") in ["Researcher", "Analyst"]: + return True + + # Analyst should receive research results + if agent.role == AgentRole.ANALYST and task.metadata.get("suggested_role") == "Researcher": + return True + + return False + + def _calculate_success_rate(self, tasks: List[Task]) -> float: + """Calculate the success rate of task execution.""" if not tasks: return 0.0 @@ -806,18 +1790,80 @@ class CoordinatorAgent(BaseAgent): for agent_name, agent in self.agents.items(): agent_tasks = [task for task in tasks if task.assigned_to == agent_name] - contributions[agent_name] = { - "role": agent.role.value, - "tasks_completed": len(agent_tasks), - "average_confidence": np.mean([task.confidence for task in agent_tasks]) if agent_tasks else 0.0, - "total_execution_time": sum( - (task.completed_at - task.created_at).total_seconds() + if agent_tasks: + total_execution_time = sum( + task.performance_metrics.get('execution_time', 0) for task in agent_tasks - if task.completed_at ) - } + + avg_confidence = np.mean([task.confidence for task in agent_tasks]) + + contributions[agent_name] = { + "role": agent.role.value, + "tasks_completed": len(agent_tasks), + "average_confidence": avg_confidence, + "total_execution_time": total_execution_time, + "collaboration_count": len(agent.collaboration_partners), + "status": agent.get_status_summary() + } + else: + contributions[agent_name] = { + "role": agent.role.value, + "tasks_completed": 0, + "average_confidence": 0.0, + "total_execution_time": 0.0, + "collaboration_count": 0, + "status": agent.get_status_summary() + } return contributions + + def get_workflow_insights(self) -> Dict[str, Any]: + """Get insights about workflow execution patterns.""" + if not self.execution_history: + return {"message": "No execution history available"} + + # Analyze execution patterns + total_executions = len(self.execution_history) + avg_execution_time = np.mean([wf['execution_time'] for wf in self.execution_history]) + avg_success_rate = np.mean([wf['success_rate'] for wf in self.execution_history]) + + # Analyze agent performance + agent_stats = {} + for workflow in self.execution_history: + for agent, contrib in workflow['agent_contributions'].items(): + if agent not in agent_stats: + agent_stats[agent] = { + 'total_tasks': 0, + 'total_time': 0, + 'confidence_scores': [] + } + + agent_stats[agent]['total_tasks'] += contrib['tasks_completed'] + agent_stats[agent]['total_time'] += contrib['total_execution_time'] + if contrib['average_confidence'] > 0: + agent_stats[agent]['confidence_scores'].append(contrib['average_confidence']) + + # Calculate agent efficiency + agent_efficiency = {} + for agent, stats in agent_stats.items(): + if stats['total_tasks'] > 0: + agent_efficiency[agent] = { + 'avg_time_per_task': stats['total_time'] / stats['total_tasks'], + 'avg_confidence': np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0, + 'total_tasks': stats['total_tasks'] + } + + return { + 'total_workflows_executed': total_executions, + 'average_execution_time': avg_execution_time, + 'average_success_rate': avg_success_rate, + 'agent_efficiency': agent_efficiency, + 'most_efficient_agent': min(agent_efficiency.items(), + key=lambda x: x[1]['avg_time_per_task'])[0] if agent_efficiency else None, + 'highest_quality_agent': max(agent_efficiency.items(), + key=lambda x: x[1]['avg_confidence'])[0] if agent_efficiency else None + } class WorkflowVisualizer: """Handles visualization of agent interactions and workflow.""" @@ -825,46 +1871,87 @@ class WorkflowVisualizer: def __init__(self): self.color_map = Config.NODE_COLORS self.layout_cache = {} + self.animation_frames = [] def create_workflow_graph(self, workflow_graph: nx.DiGraph, - active_agents: List[str] = None) -> go.Figure: + active_agents: List[str] = None, + highlight_tasks: List[str] = None) -> go.Figure: """Create an interactive workflow visualization.""" + if len(workflow_graph.nodes()) == 0: + return self._create_empty_graph() + # Use hierarchical layout pos = self._hierarchical_layout(workflow_graph) - # Create edge trace + # Create traces edge_trace = self._create_edge_trace(workflow_graph, pos) + node_trace = self._create_node_trace(workflow_graph, pos, active_agents, highlight_tasks) + + # Add agent status indicators + status_trace = self._create_status_indicators(workflow_graph, pos, active_agents) + + # Create figure with all traces + traces = [edge_trace, node_trace] + if status_trace: + traces.append(status_trace) + + fig = go.Figure( + data=traces, + layout=go.Layout( + title={ + 'text': 'Agent Collaboration Network', + 'x': 0.5, + 'xanchor': 'center', + 'font': {'size': 20, 'color': '#2c3e50'} + }, + titlefont_size=16, + showlegend=False, + hovermode='closest', + margin=dict(b=40, l=40, r=40, t=60), + xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), + yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), + plot_bgcolor='#f8f9fa', + paper_bgcolor='white', + font=dict(family='Inter, sans-serif'), + annotations=self._create_annotations(workflow_graph, pos) + ) + ) - # Create node trace - node_trace = self._create_node_trace(workflow_graph, pos, active_agents) - - # Create figure - fig = go.Figure(data=[edge_trace, node_trace], - layout=go.Layout( - title='Agent Collaboration Network', - titlefont_size=16, - showlegend=False, - hovermode='closest', - margin=dict(b=20, l=5, r=5, t=40), - xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), - yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), - plot_bgcolor='white' - )) + # Add shapes for visual enhancement + fig.update_layout( + shapes=self._create_background_shapes(pos) + ) return fig + def _create_empty_graph(self) -> go.Figure: + """Create empty graph with message.""" + fig = go.Figure() + fig.add_annotation( + text="No workflow data to display.
Start an analysis to see agent interactions.", + xref="paper", yref="paper", + x=0.5, y=0.5, + showarrow=False, + font=dict(size=16, color='#7f8c8d'), + bgcolor='#ecf0f1', + borderpad=20 + ) + fig.update_layout( + height=500, + plot_bgcolor='#f8f9fa', + paper_bgcolor='white' + ) + return fig + def _hierarchical_layout(self, G: nx.DiGraph) -> Dict: """Create hierarchical layout for the graph.""" - if len(G) == 0: - return {} - - # Try to use cached layout if graph structure hasn't changed - graph_hash = hash(tuple(G.edges())) + # Check cache + graph_hash = hash(tuple(sorted(G.edges()))) if graph_hash in self.layout_cache: return self.layout_cache[graph_hash] - # Define role hierarchy + # Define role hierarchy with better spacing hierarchy = { 'Coordinator': 0, 'Researcher': 1, @@ -873,26 +1960,35 @@ class WorkflowVisualizer: 'Synthesizer': 4 } - # Position nodes based on hierarchy + # Group nodes by type + agent_nodes = [n for n in G.nodes() if G.nodes[n].get('role')] + task_nodes = [n for n in G.nodes() if 'task_' in str(n)] + pos = {} - role_counts = {} - for node in G.nodes(): + # Position agent nodes + role_x_positions = {} + for i, (role, level) in enumerate(hierarchy.items()): + role_x_positions[role] = i * 3 - 6 # Spread horizontally + + for node in agent_nodes: role = G.nodes[node].get('role', 'Unknown') level = hierarchy.get(role, 5) - - if level not in role_counts: - role_counts[level] = 0 - - x = role_counts[level] * 2 - (len([n for n in G.nodes() if hierarchy.get(G.nodes[n].get('role'), 5) == level]) - 1) - y = -level * 2 - + x = role_x_positions.get(role, 0) + y = -level * 2.5 pos[node] = (x, y) - role_counts[level] += 1 - # Cache the layout - self.layout_cache[graph_hash] = pos + # Position task nodes in a grid on the right + if task_nodes: + task_start_x = max(role_x_positions.values()) + 4 + cols = 3 + for i, task in enumerate(task_nodes): + row = i // cols + col = i % cols + pos[task] = (task_start_x + col * 2, -row * 1.5) + # Cache layout + self.layout_cache[graph_hash] = pos return pos def _create_edge_trace(self, G: nx.DiGraph, pos: Dict) -> go.Scatter: @@ -901,82 +1997,214 @@ class WorkflowVisualizer: edge_y = [] for edge in G.edges(): - x0, y0 = pos.get(edge[0], (0, 0)) - x1, y1 = pos.get(edge[1], (0, 0)) - edge_x.extend([x0, x1, None]) - edge_y.extend([y0, y1, None]) + if edge[0] in pos and edge[1] in pos: + x0, y0 = pos[edge[0]] + x1, y1 = pos[edge[1]] + + # Add bezier curve points for smoother edges + mid_x = (x0 + x1) / 2 + mid_y = (y0 + y1) / 2 - 0.5 # Slight curve + + # Create curved edge + t = np.linspace(0, 1, 10) + for ti in t: + x = (1-ti)**2 * x0 + 2*(1-ti)*ti * mid_x + ti**2 * x1 + y = (1-ti)**2 * y0 + 2*(1-ti)*ti * mid_y + ti**2 * y1 + edge_x.append(x) + edge_y.append(y) + edge_x.append(None) + edge_y.append(None) edge_trace = go.Scatter( x=edge_x, y=edge_y, - line=dict(width=2, color='#888'), + line=dict(width=2, color='#bdc3c7'), hoverinfo='none', - mode='lines' + mode='lines', + opacity=0.6 ) return edge_trace def _create_node_trace(self, G: nx.DiGraph, pos: Dict, - active_agents: List[str] = None) -> go.Scatter: + active_agents: List[str] = None, + highlight_tasks: List[str] = None) -> go.Scatter: """Create node trace for the graph.""" node_x = [] node_y = [] node_colors = [] node_sizes = [] node_text = [] + node_hover = [] + node_symbols = [] for node in G.nodes(): - x, y = pos.get(node, (0, 0)) - node_x.append(x) - node_y.append(y) - - # Get node attributes - role = G.nodes[node].get('role', 'Unknown') - color = self.color_map.get(role, '#666') - - # Highlight active agents - if active_agents and node in active_agents: - size = 30 - color = self._brighten_color(color) - else: - size = 20 - - node_colors.append(color) - node_sizes.append(size) - - # Create hover text - degree = G.degree(node) - hover_text = f"{node}
Role: {role}
Connections: {degree}" - node_text.append(hover_text) + if node in pos: + x, y = pos[node] + node_x.append(x) + node_y.append(y) + + # Get node attributes + node_data = G.nodes[node] + role = node_data.get('role', '') + + # Determine node properties + if role: # Agent node + color = self.color_map.get(role, '#95a5a6') + size = 35 + symbol = 'circle' + + # Highlight active agents + if active_agents and node in active_agents: + size = 45 + color = self._brighten_color(color) + + hover_text = f"{node}
Role: {role}
Status: Active" + display_text = node.split('-')[0] # Show role name + + else: # Task node + status = node_data.get('status', 'pending') + color = self._get_status_color(status) + size = 25 + symbol = 'square' + + # Highlight specific tasks + if highlight_tasks and str(node) in highlight_tasks: + size = 30 + color = self._brighten_color(color) + + task_desc = node_data.get('task_description', 'Task') + hover_text = f"{node}
{task_desc}
Status: {status}" + display_text = node if len(str(node)) < 10 else f"T{node[-1]}" + + node_colors.append(color) + node_sizes.append(size) + node_text.append(display_text) + node_hover.append(hover_text) + node_symbols.append(symbol) node_trace = go.Scatter( x=node_x, y=node_y, mode='markers+text', hoverinfo='text', - text=[node for node in G.nodes()], - textposition="top center", - hovertext=node_text, + text=node_text, + hovertext=node_hover, + textposition="bottom center", + textfont=dict(size=10, color='#2c3e50'), marker=dict( showscale=False, color=node_colors, size=node_sizes, + symbol=node_symbols, line=dict(color='white', width=2) ) ) return node_trace + def _create_status_indicators(self, G: nx.DiGraph, pos: Dict, + active_agents: List[str] = None) -> Optional[go.Scatter]: + """Create status indicators for active agents.""" + if not active_agents: + return None + + indicator_x = [] + indicator_y = [] + + for agent in active_agents: + if agent in pos: + x, y = pos[agent] + # Place indicator above agent + indicator_x.append(x) + indicator_y.append(y + 0.5) + + if not indicator_x: + return None + + return go.Scatter( + x=indicator_x, + y=indicator_y, + mode='markers', + marker=dict( + size=8, + color='#2ecc71', + symbol='circle', + line=dict(width=2, color='white') + ), + hoverinfo='skip' + ) + + def _create_annotations(self, G: nx.DiGraph, pos: Dict) -> List[Dict]: + """Create annotations for the graph.""" + annotations = [] + + # Add role labels + role_positions = {} + for node in G.nodes(): + if G.nodes[node].get('role'): + role = G.nodes[node]['role'] + if role not in role_positions: + x, y = pos[node] + role_positions[role] = (x, y - 0.8) + + for role, (x, y) in role_positions.items(): + annotations.append(dict( + x=x, y=y, + text=f"{role}", + showarrow=False, + font=dict(size=12, color='#34495e'), + opacity=0.8 + )) + + return annotations + + def _create_background_shapes(self, pos: Dict) -> List[Dict]: + """Create background shapes for visual grouping.""" + shapes = [] + + # Group nodes by x-coordinate regions + if pos: + x_coords = [p[0] for p in pos.values()] + y_coords = [p[1] for p in pos.values()] + + if x_coords and y_coords: + # Create subtle background regions + padding = 1.5 + shapes.append(dict( + type='rect', + x0=min(x_coords) - padding, + y0=min(y_coords) - padding, + x1=max(x_coords) + padding, + y1=max(y_coords) + padding, + fillcolor='#ecf0f1', + opacity=0.3, + line=dict(width=0) + )) + + return shapes + + def _get_status_color(self, status: str) -> str: + """Get color based on task status.""" + status_colors = { + 'completed': '#2ecc71', + 'in_progress': '#f39c12', + 'pending': '#95a5a6', + 'failed': '#e74c3c' + } + return status_colors.get(status, '#95a5a6') + def _brighten_color(self, color: str) -> str: """Make a color brighter for highlighting.""" - # Simple brightening by mixing with white if color.startswith('#'): + # Convert hex to RGB r = int(color[1:3], 16) g = int(color[3:5], 16) b = int(color[5:7], 16) - # Mix with white (255, 255, 255) - r = int(r + (255 - r) * 0.3) - g = int(g + (255 - g) * 0.3) - b = int(b + (255 - b) * 0.3) + # Brighten by mixing with white + factor = 0.3 + r = int(r + (255 - r) * factor) + g = int(g + (255 - g) * factor) + b = int(b + (255 - b) * factor) return f"#{r:02x}{g:02x}{b:02x}" @@ -985,64 +2213,113 @@ class WorkflowVisualizer: def create_task_timeline(self, tasks: List[Task]) -> go.Figure: """Create a timeline visualization of task execution.""" - # Prepare data for timeline + # Prepare timeline data timeline_data = [] for task in tasks: - if task.created_at and task.completed_at: + if task.created_at: + # Use completed_at if available, otherwise estimate + end_time = task.completed_at if task.completed_at else task.created_at + timedelta(seconds=30) + timeline_data.append({ 'Task': task.id, 'Agent': task.assigned_to or 'Unassigned', 'Start': task.created_at, - 'Finish': task.completed_at, + 'Finish': end_time, 'Status': task.status.value, 'Confidence': task.confidence }) if not timeline_data: - # Return empty figure if no completed tasks - fig = go.Figure() - fig.add_annotation( - text="No completed tasks to display", - xref="paper", yref="paper", - x=0.5, y=0.5, - showarrow=False - ) - return fig + return self._create_empty_timeline() + # Create DataFrame df = pd.DataFrame(timeline_data) - # Create Gantt chart + # Create Gantt chart with custom styling fig = px.timeline( df, x_start="Start", x_end="Finish", y="Agent", color="Confidence", - hover_data=["Task", "Status", "Confidence"], - title="Task Execution Timeline", - color_continuous_scale="Viridis" + hover_data=["Task", "Status"], + color_continuous_scale="Viridis", + labels={'Confidence': 'Confidence Score'} + ) + + # Update layout for professional appearance + fig.update_layout( + title={ + 'text': 'Task Execution Timeline', + 'x': 0.5, + 'xanchor': 'center', + 'font': {'size': 18, 'color': '#2c3e50'} + }, + height=400, + xaxis_title="Time", + yaxis_title="Agent", + plot_bgcolor='#f8f9fa', + paper_bgcolor='white', + font=dict(family='Inter, sans-serif'), + yaxis={'categoryorder': 'total ascending'} ) - fig.update_yaxis(categoryorder="total ascending") - fig.update_layout(height=400) + # Add grid lines + fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0') + fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0') return fig + def _create_empty_timeline(self) -> go.Figure: + """Create empty timeline with message.""" + fig = go.Figure() + fig.add_annotation( + text="No task execution data available yet.", + xref="paper", yref="paper", + x=0.5, y=0.5, + showarrow=False, + font=dict(size=14, color='#7f8c8d') + ) + fig.update_layout( + height=400, + plot_bgcolor='#f8f9fa', + paper_bgcolor='white' + ) + return fig + def create_confidence_heatmap(self, agent_contributions: Dict[str, Any]) -> go.Figure: - """Create a heatmap showing agent confidence levels.""" + """Create a heatmap showing agent performance metrics.""" + if not agent_contributions: + return self._create_empty_heatmap() + + # Prepare data agents = list(agent_contributions.keys()) - metrics = ['tasks_completed', 'average_confidence', 'total_execution_time'] + metrics = ['Tasks Completed', 'Avg Confidence', 'Time Efficiency', 'Collaboration Score'] - # Normalize data for heatmap + # Create data matrix data = [] for metric in metrics: row = [] for agent in agents: - value = agent_contributions[agent].get(metric, 0) - if metric == 'total_execution_time': - value = value / 60 # Convert to minutes + contrib = agent_contributions[agent] + + if metric == 'Tasks Completed': + value = contrib.get('tasks_completed', 0) / 10.0 # Normalize + elif metric == 'Avg Confidence': + value = contrib.get('average_confidence', 0) + elif metric == 'Time Efficiency': + # Inverse of average time per task, normalized + time = contrib.get('total_execution_time', 1) + tasks = contrib.get('tasks_completed', 1) + avg_time = time / tasks if tasks > 0 else float('inf') + value = min(1.0, 30.0 / avg_time) if avg_time > 0 else 0 + elif metric == 'Collaboration Score': + value = min(1.0, contrib.get('collaboration_count', 0) / 5.0) + else: + value = 0 + row.append(value) data.append(row) @@ -1050,18 +2327,103 @@ class WorkflowVisualizer: fig = go.Figure(data=go.Heatmap( z=data, x=agents, - y=['Tasks Completed', 'Avg Confidence', 'Time (min)'], + y=metrics, colorscale='Blues', text=np.round(data, 2), texttemplate='%{text}', - textfont={"size": 10} + textfont={"size": 12}, + colorbar=dict(title="Score", titleside="right"), + hoverongaps=False )) + # Update layout fig.update_layout( - title="Agent Performance Metrics", + title={ + 'text': 'Agent Performance Metrics', + 'x': 0.5, + 'xanchor': 'center', + 'font': {'size': 18, 'color': '#2c3e50'} + }, xaxis_title="Agents", yaxis_title="Metrics", - height=300 + height=350, + plot_bgcolor='white', + paper_bgcolor='white', + font=dict(family='Inter, sans-serif') + ) + + # Add cell borders + fig.update_xaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50') + fig.update_yaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50') + + return fig + + def _create_empty_heatmap(self) -> go.Figure: + """Create empty heatmap with message.""" + fig = go.Figure() + fig.add_annotation( + text="No agent performance data available yet.", + xref="paper", yref="paper", + x=0.5, y=0.5, + showarrow=False, + font=dict(size=14, color='#7f8c8d') + ) + fig.update_layout( + height=350, + plot_bgcolor='#f8f9fa', + paper_bgcolor='white' + ) + return fig + + def create_performance_comparison(self, performance_metrics: Dict[str, Any]) -> go.Figure: + """Create performance comparison visualization.""" + + # Extract metrics + baseline_time = Config.BENCHMARK_BASELINE['single_agent_time'] + actual_time = performance_metrics.get('average_task_completion_time', baseline_time) + time_improvement = performance_metrics.get('time_improvement_percentage', 0) + + # Create bar chart comparing performance + categories = ['Single Agent', 'Multi-Agent System'] + values = [baseline_time, actual_time] + colors = ['#e74c3c', '#2ecc71'] + + fig = go.Figure(data=[ + go.Bar( + x=categories, + y=values, + marker_color=colors, + text=[f'{v:.1f}s' for v in values], + textposition='auto' + ) + ]) + + # Add improvement annotation + fig.add_annotation( + x=1, y=actual_time + 5, + text=f"{time_improvement:.1f}% Faster", + showarrow=True, + arrowhead=2, + arrowsize=1, + arrowwidth=2, + arrowcolor='#2ecc71', + font=dict(size=14, color='#2ecc71', weight='bold') + ) + + # Update layout + fig.update_layout( + title={ + 'text': 'Performance Comparison', + 'x': 0.5, + 'xanchor': 'center', + 'font': {'size': 18, 'color': '#2c3e50'} + }, + yaxis_title="Average Completion Time (seconds)", + height=400, + plot_bgcolor='#f8f9fa', + paper_bgcolor='white', + font=dict(family='Inter, sans-serif'), + showlegend=False ) return fig @@ -1076,7 +2438,8 @@ class ReportGenerator: 'agent_contributions': self._generate_agent_contributions, 'key_findings': self._generate_key_findings, 'recommendations': self._generate_recommendations, - 'confidence_analysis': self._generate_confidence_analysis + 'confidence_analysis': self._generate_confidence_analysis, + 'performance_metrics': self._generate_performance_metrics } def generate_report(self, @@ -1097,21 +2460,36 @@ class ReportGenerator: for section in include_sections: if section in self.section_generators: section_content = self.section_generators[section](workflow_result, problem_statement) - report_sections.append(section_content) + if section_content: + report_sections.append(section_content) # Footer report_sections.append(self._generate_footer(workflow_result)) - return "\n\n".join(report_sections) + # Format the complete report + complete_report = "\n\n".join(report_sections) + + # Add CSS styling for better markdown rendering + styled_report = f"""
+ +{complete_report} + +
""" + + return styled_report def _generate_header(self, problem_statement: str) -> str: """Generate report header.""" return f"""# Multi-Agent Analysis Report -**Generated:** {datetime.now().strftime('%B %d, %Y at %I:%M %p')} +
+**Generated:** {datetime.now().strftime('%B %d, %Y at %I:%M %p')} +**System:** {Config.COMPANY_NAME} **Problem Statement:** {problem_statement} +
+ ---""" def _generate_executive_summary(self, workflow_result: Dict[str, Any], @@ -1120,25 +2498,41 @@ class ReportGenerator: tasks = workflow_result.get('tasks', []) success_rate = workflow_result.get('success_rate', 0) execution_time = workflow_result.get('execution_time', 0) + performance = workflow_result.get('performance_metrics', {}) - # Find synthesis results + # Find synthesis task for summary content synthesis_task = None for task in tasks: if task.assigned_to and 'Synthesizer' in task.assigned_to: synthesis_task = task break + summary_content = "" + if synthesis_task and isinstance(synthesis_task.result, dict): + exec_summary = synthesis_task.result.get('executive_summary', '') + if exec_summary: + summary_content = f"\n\n{exec_summary}" + + # Calculate performance improvement + time_improvement = performance.get('time_improvement_percentage', 0) + efficiency_score = performance.get('efficiency_score', 0) + summary = f"""## Executive Summary -The multi-agent system successfully analyzed the problem with a **{success_rate:.0%} task completion rate** in **{execution_time:.1f} seconds**. +
-""" - - if synthesis_task and synthesis_task.result: - if isinstance(synthesis_task.result, dict) and 'executive_summary' in synthesis_task.result: - summary += synthesis_task.result['executive_summary'] - else: - summary += "The analysis revealed key insights across multiple dimensions of the problem." +**Key Performance Indicators:** +- **Task Completion Rate:** {success_rate:.0%} +- **Total Execution Time:** {execution_time:.1f} seconds +- **Performance Improvement:** {time_improvement:.1f}% faster than single-agent approach +- **System Efficiency Score:** {efficiency_score:.2f}/1.0 + +
+ +The multi-agent system successfully analyzed the problem through coordinated efforts of specialized agents, achieving superior results compared to traditional single-agent approaches. +{summary_content} + +**Bottom Line:** The analysis provides actionable insights with high confidence, leveraging the collective intelligence of multiple specialized agents.""" return summary @@ -1147,19 +2541,28 @@ The multi-agent system successfully analyzed the problem with a **{success_rate: """Generate task analysis section.""" tasks = workflow_result.get('tasks', []) + if not tasks: + return "" + content = "## Task Analysis\n\n" - # Group tasks by status + # Task overview completed_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED] failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED] - content += f"**Total Tasks:** {len(tasks)}\n" - content += f"**Completed:** {len(completed_tasks)}\n" - content += f"**Failed:** {len(failed_tasks)}\n\n" - - # List tasks by agent - content += "### Tasks by Agent\n\n" + content += f"""
+ +**Task Overview:** +- Total Tasks: {len(tasks)} +- Completed: {len(completed_tasks)} ✓ +- Failed: {len(failed_tasks)} ✗ +- Average Confidence: {np.mean([t.confidence for t in completed_tasks]) if completed_tasks else 0:.2%} + +
+ +### Task Breakdown by Agent\n\n""" + # Group tasks by agent agents_tasks = {} for task in tasks: agent = task.assigned_to or "Unassigned" @@ -1168,32 +2571,73 @@ The multi-agent system successfully analyzed the problem with a **{success_rate: agents_tasks[agent].append(task) for agent, agent_tasks in agents_tasks.items(): - content += f"**{agent}:**\n" + # Get agent role + role = "Unknown" + for t in agent_tasks: + if t.metadata.get('suggested_role'): + role = t.metadata['suggested_role'] + break + + content += f"#### {agent} ({role})\n\n" + for task in agent_tasks: - status_emoji = "✓" if task.status == TaskStatus.COMPLETED else "✗" - confidence = f"({task.confidence:.0%} confidence)" if task.confidence > 0 else "" - content += f"- {status_emoji} {task.description} {confidence}\n" - content += "\n" + status_icon = "✓" if task.status == TaskStatus.COMPLETED else "✗" + confidence_bar = self._create_confidence_bar(task.confidence) + exec_time = task.performance_metrics.get('execution_time', 0) + + content += f"""
+ +**{status_icon} Task {task.id}** +*Description:* {task.description} +*Execution Time:* {exec_time:.1f}s +*Confidence:* {confidence_bar} {task.confidence:.0%} + +
+""" return content + def _create_confidence_bar(self, confidence: float) -> str: + """Create visual confidence bar.""" + filled = int(confidence * 10) + empty = 10 - filled + return "▰" * filled + "▱" * empty + def _generate_agent_contributions(self, workflow_result: Dict[str, Any], problem_statement: str) -> str: """Generate agent contributions section.""" contributions = workflow_result.get('agent_contributions', {}) + if not contributions: + return "" + content = "## Agent Contributions\n\n" + # Create contribution cards for each agent for agent, stats in contributions.items(): role = stats.get('role', 'Unknown') tasks_completed = stats.get('tasks_completed', 0) avg_confidence = stats.get('average_confidence', 0) exec_time = stats.get('total_execution_time', 0) + collab_count = stats.get('collaboration_count', 0) + + # Get role color + role_color = Config.NODE_COLORS.get(role, '#95a5a6') - content += f"### {agent} ({role})\n" - content += f"- Tasks Completed: {tasks_completed}\n" - content += f"- Average Confidence: {avg_confidence:.0%}\n" - content += f"- Total Execution Time: {exec_time:.1f}s\n\n" + content += f"""
+ +### {agent} +**Role:** {role} + +| Metric | Value | +|--------|-------| +| Tasks Completed | {tasks_completed} | +| Average Confidence | {avg_confidence:.0%} | +| Total Execution Time | {exec_time:.1f}s | +| Collaborations | {collab_count} | + +
+""" return content @@ -1204,37 +2648,50 @@ The multi-agent system successfully analyzed the problem with a **{success_rate: content = "## Key Findings\n\n" + findings_by_type = { + 'Research Findings': [], + 'Analytical Insights': [], + 'Critical Observations': [], + 'Synthesized Conclusions': [] + } + # Extract findings from different agent types for task in tasks: if task.status == TaskStatus.COMPLETED and task.result: - agent_role = task.metadata.get('suggested_role', '') - - if 'Researcher' in agent_role and isinstance(task.result, dict): - if 'key_points' in task.result: - content += "### Research Findings\n" - for point in task.result['key_points'][:5]: - content += f"- {point}\n" - content += "\n" + role = task.metadata.get('suggested_role', '') - elif 'Analyst' in agent_role and isinstance(task.result, dict): - if 'insights' in task.result: - content += "### Analytical Insights\n" - for insight in task.result['insights'][:5]: - content += f"- {insight}\n" - content += "\n" - - elif 'Critic' in agent_role and isinstance(task.result, dict): - if 'strengths' in task.result: - content += "### Identified Strengths\n" - for strength in task.result['strengths'][:3]: - content += f"- {strength}\n" - content += "\n" + if isinstance(task.result, dict): + if 'Researcher' in role: + if 'key_points' in task.result: + findings_by_type['Research Findings'].extend(task.result['key_points'][:3]) + + elif 'Analyst' in role: + if 'insights' in task.result: + findings_by_type['Analytical Insights'].extend(task.result['insights'][:3]) + + elif 'Critic' in role: + if 'strengths' in task.result: + for strength in task.result['strengths'][:2]: + if isinstance(strength, dict): + findings_by_type['Critical Observations'].append(strength.get('strength', str(strength))) + else: + findings_by_type['Critical Observations'].append(str(strength)) - if 'gaps' in task.result: - content += "### Identified Gaps\n" - for gap in task.result['gaps'][:3]: - content += f"- {gap}\n" - content += "\n" + elif 'Synthesizer' in role: + if 'key_themes' in task.result: + for theme in task.result['key_themes'][:2]: + if isinstance(theme, dict): + findings_by_type['Synthesized Conclusions'].append(theme.get('description', str(theme))) + else: + findings_by_type['Synthesized Conclusions'].append(str(theme)) + + # Format findings + for finding_type, findings in findings_by_type.items(): + if findings: + content += f"### {finding_type}\n\n" + for finding in findings: + content += f"- {finding}\n" + content += "\n" return content @@ -1251,53 +2708,67 @@ The multi-agent system successfully analyzed the problem with a **{success_rate: for task in tasks: if task.status == TaskStatus.COMPLETED and task.result: if isinstance(task.result, dict): - if 'recommendations' in task.result: - all_recommendations.extend(task.result['recommendations']) - elif 'final_recommendations' in task.result: - all_recommendations.extend(task.result['final_recommendations']) - elif 'improvements' in task.result: - all_recommendations.extend(task.result['improvements']) - - # Remove duplicates and limit - unique_recommendations = [] - seen = set() - for rec in all_recommendations: - rec_lower = rec.lower() - if rec_lower not in seen: - seen.add(rec_lower) - unique_recommendations.append(rec) - - # Categorize recommendations + # Check different possible recommendation fields + for field in ['recommendations', 'final_recommendations', 'improvements']: + if field in task.result: + recs = task.result[field] + for rec in recs: + if isinstance(rec, dict): + all_recommendations.append(rec) + else: + all_recommendations.append({ + 'recommendation': str(rec), + 'priority': 'medium', + 'source': task.assigned_to + }) + + if not all_recommendations: + return content + "*No specific recommendations were generated.*\n" + + # Categorize by priority high_priority = [] medium_priority = [] low_priority = [] - for rec in unique_recommendations[:10]: - # Simple prioritization based on keywords - if any(word in rec.lower() for word in ['critical', 'must', 'essential', 'immediately']): + for rec in all_recommendations: + priority = rec.get('priority', 'medium') + if priority == 'high': high_priority.append(rec) - elif any(word in rec.lower() for word in ['should', 'recommend', 'important']): - medium_priority.append(rec) - else: + elif priority == 'low': low_priority.append(rec) + else: + medium_priority.append(rec) + # Format recommendations if high_priority: - content += "### High Priority\n" - for rec in high_priority: - content += f"- {rec}\n" - content += "\n" + content += """
+ +### 🔴 High Priority + +""" + for rec in high_priority[:3]: + content += f"- {rec.get('recommendation', rec)}\n" + content += "\n
\n" if medium_priority: - content += "### Medium Priority\n" - for rec in medium_priority: - content += f"- {rec}\n" - content += "\n" + content += """
+ +### 🟡 Medium Priority + +""" + for rec in medium_priority[:4]: + content += f"- {rec.get('recommendation', rec)}\n" + content += "\n
\n" if low_priority: - content += "### Low Priority\n" - for rec in low_priority: - content += f"- {rec}\n" - content += "\n" + content += """
+ +### 🟢 Low Priority + +""" + for rec in low_priority[:3]: + content += f"- {rec.get('recommendation', rec)}\n" + content += "\n
\n" return content @@ -1309,28 +2780,102 @@ The multi-agent system successfully analyzed the problem with a **{success_rate: content = "## Confidence Analysis\n\n" - # Overall confidence - overall_confidence = np.mean([t.confidence for t in tasks if t.confidence > 0]) - content += f"**Overall Confidence Score:** {overall_confidence:.0%}\n\n" + # Calculate overall confidence + task_confidences = [t.confidence for t in tasks if t.confidence > 0] + overall_confidence = np.mean(task_confidences) if task_confidences else 0 + + # Create confidence gauge + confidence_level = "High" if overall_confidence > 0.8 else "Medium" if overall_confidence > 0.6 else "Low" + confidence_color = "#2ecc71" if overall_confidence > 0.8 else "#f39c12" if overall_confidence > 0.6 else "#e74c3c" + + content += f"""
+
+

Overall Confidence Score

+

{overall_confidence:.0%}

+

Level: {confidence_level}

+
+
+ +### Confidence by Agent Role + +""" - # Confidence by agent type - content += "### Confidence by Agent Role\n" + # Create table of agent confidences + content += "| Agent Role | Average Confidence | Tasks |\n" + content += "|------------|-------------------|-------|\n" for agent, stats in contributions.items(): - avg_conf = stats.get('average_confidence', 0) role = stats.get('role', 'Unknown') - content += f"- **{role}**: {avg_conf:.0%}\n" + avg_conf = stats.get('average_confidence', 0) + tasks = stats.get('tasks_completed', 0) + content += f"| {role} | {avg_conf:.0%} | {tasks} |\n" - content += "\n### Confidence Distribution\n" + content += "\n### Confidence Distribution\n\n" # Categorize confidence levels - high_conf = [t for t in tasks if t.confidence >= 0.8] - medium_conf = [t for t in tasks if 0.5 <= t.confidence < 0.8] - low_conf = [t for t in tasks if t.confidence < 0.5] + high_conf = len([t for t in tasks if t.confidence >= 0.8]) + medium_conf = len([t for t in tasks if 0.5 <= t.confidence < 0.8]) + low_conf = len([t for t in tasks if t.confidence < 0.5]) + + content += f"""- **High Confidence (≥80%):** {high_conf} tasks +- **Medium Confidence (50-79%):** {medium_conf} tasks +- **Low Confidence (<50%):** {low_conf} tasks + +*The confidence scores reflect the system's assessment of result quality and reliability based on evidence strength, consistency, and completeness.*""" + + return content + + def _generate_performance_metrics(self, workflow_result: Dict[str, Any], + problem_statement: str) -> str: + """Generate performance metrics section.""" + performance = workflow_result.get('performance_metrics', {}) + + if not performance: + return "" + + content = "## Performance Metrics\n\n" + + # Extract metrics + total_time = performance.get('total_execution_time', 0) + avg_task_time = performance.get('average_task_completion_time', 0) + total_collab = performance.get('total_collaborations', 0) + total_messages = performance.get('total_messages', 0) + efficiency = performance.get('efficiency_score', 0) + time_improvement = performance.get('time_improvement_percentage', 0) + + content += f"""
+ +### System Performance Overview + +| Metric | Value | Benchmark | +|--------|-------|-----------| +| Total Execution Time | {total_time:.1f}s | - | +| Average Task Time | {avg_task_time:.1f}s | {Config.BENCHMARK_BASELINE['single_agent_time']:.1f}s | +| Time Improvement | {time_improvement:.1f}% | Baseline | +| Total Collaborations | {total_collab} | - | +| Message Exchanges | {total_messages} | - | +| Efficiency Score | {efficiency:.2%} | 50% | + +
+ +### Performance Insights + +""" - content += f"- High Confidence (≥80%): {len(high_conf)} tasks\n" - content += f"- Medium Confidence (50-79%): {len(medium_conf)} tasks\n" - content += f"- Low Confidence (<50%): {len(low_conf)} tasks\n" + # Add performance insights + if time_improvement > 30: + content += "- **Exceptional Performance**: The multi-agent system achieved significant time savings through parallel processing.\n" + elif time_improvement > 15: + content += "- **Good Performance**: The system demonstrated efficient task distribution and execution.\n" + else: + content += "- **Standard Performance**: The system completed tasks within expected parameters.\n" + + if efficiency > 0.8: + content += "- **High Efficiency**: Excellent resource utilization and agent coordination.\n" + elif efficiency > 0.6: + content += "- **Moderate Efficiency**: Good balance between speed and quality.\n" + else: + content += "- **Efficiency Opportunity**: Consider optimizing agent workflows for better performance.\n" return content @@ -1341,10 +2886,16 @@ The multi-agent system successfully analyzed the problem with a **{success_rate: return f"""--- -**Report Generation Details:** -- Analysis completed in {execution_time:.1f} seconds -- Report generated at {timestamp.strftime('%B %d, %Y at %I:%M %p')} -- Powered by Multi-Agent AI Collaboration System""" +
+ +**Report Generation Details** +Analysis completed in {execution_time:.1f} seconds +Report generated at {timestamp.strftime('%B %d, %Y at %I:%M %p')} + +*Powered by {Config.COMPANY_NAME}* +*Advanced Multi-Agent AI Collaboration System* + +
""" # Gradio Interface Functions def create_gradio_interface(): @@ -1358,22 +2909,27 @@ def create_gradio_interface(): # State variables current_workflow = None current_problem = "" + demo_mode = False - def initialize_agents(api_key: str, model: str = "gpt-4") -> str: + def initialize_agents(api_key: str, model: str = "gpt-4", use_demo: bool = False) -> str: """Initialize the multi-agent system.""" - nonlocal coordinator + nonlocal coordinator, demo_mode - if not api_key: - return "Please provide an OpenAI API key to initialize the agents." + demo_mode = use_demo + + if not use_demo and not api_key: + return "❌ Please provide an OpenAI API key or enable Demo Mode to initialize the agents." try: - # Initialize LLM - llm = ChatOpenAI( - api_key=api_key, - model=model, - temperature=Config.TEMPERATURE, - max_tokens=Config.MAX_TOKENS - ) + # Initialize LLM only if not in demo mode + llm = None + if not use_demo and api_key: + llm = ChatOpenAI( + api_key=api_key, + model=model, + temperature=Config.TEMPERATURE, + max_tokens=Config.MAX_TOKENS + ) # Create coordinator coordinator = CoordinatorAgent("Coordinator", llm) @@ -1390,42 +2946,48 @@ def create_gradio_interface(): coordinator.register_agent(critic) coordinator.register_agent(synthesizer) - return f"Successfully initialized multi-agent system with {len(coordinator.agents)} agents." + mode_text = "Demo Mode" if use_demo else f"Live Mode ({model})" + return f"✅ Successfully initialized multi-agent system with {len(coordinator.agents)} agents in {mode_text}." except Exception as e: logger.error(f"Error initializing agents: {str(e)}") - return f"Error initializing agents: {str(e)}" + return f"❌ Error initializing agents: {str(e)}" - async def analyze_problem(problem: str, execution_mode: str) -> Tuple[str, Any, Any, Any]: + async def analyze_problem(problem: str, execution_mode: str, use_template: bool = False) -> Tuple[str, Any, Any, Any, Any]: """Analyze a problem using the multi-agent system.""" nonlocal current_workflow, current_problem if not coordinator: - return "Please initialize the agents first.", None, None, None + return "❌ Please initialize the agents first.", None, None, None, None if not problem: - return "Please enter a problem to analyze.", None, None, None + return "❌ Please enter a problem to analyze.", None, None, None, None current_problem = problem try: - # Decompose problem into tasks - status = "Decomposing problem into subtasks..." - tasks = await coordinator.decompose_problem(problem) + # Update status + status = "🔄 Decomposing problem into subtasks..." + + # Decompose problem + tasks = await coordinator.decompose_problem(problem, use_template=use_template) if not tasks: - return "Failed to decompose problem into tasks.", None, None, None + return "❌ Failed to decompose problem into tasks.", None, None, None, None - # Execute workflow - status = f"Executing {len(tasks)} tasks using {execution_mode} mode..." + # Update status + status = f"🔄 Executing {len(tasks)} tasks using {execution_mode} mode..." parallel = execution_mode == "Parallel" + # Execute workflow current_workflow = await coordinator.execute_workflow(tasks, parallel=parallel) # Create visualizations + active_agents = list(coordinator.agents.keys()) + workflow_graph = visualizer.create_workflow_graph( current_workflow['workflow_graph'], - active_agents=list(coordinator.agents.keys()) + active_agents=active_agents ) timeline_chart = visualizer.create_task_timeline(tasks) @@ -1434,28 +2996,40 @@ def create_gradio_interface(): current_workflow['agent_contributions'] ) + performance_chart = visualizer.create_performance_comparison( + current_workflow['performance_metrics'] + ) + # Generate status summary success_rate = current_workflow['success_rate'] execution_time = current_workflow['execution_time'] + performance = current_workflow['performance_metrics'] - status = f"""Analysis completed successfully! - + status = f"""✅ **Analysis completed successfully!** + +📊 **Results Summary:** - Tasks executed: {len(tasks)} - Success rate: {success_rate:.0%} - Execution time: {execution_time:.1f} seconds -- Agents involved: {len(coordinator.agents)}""" +- Performance improvement: {performance.get('time_improvement_percentage', 0):.1f}% faster +- Agents involved: {len(coordinator.agents)} + +🤖 **Agent Activity:** +- Total collaborations: {performance.get('total_collaborations', 0)} +- Messages exchanged: {performance.get('total_messages', 0)} +- Efficiency score: {performance.get('efficiency_score', 0):.2%}""" - return status, workflow_graph, timeline_chart, confidence_heatmap + return status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart except Exception as e: logger.error(f"Error analyzing problem: {str(e)}") - return f"Error during analysis: {str(e)}", None, None, None + return f"❌ Error during analysis: {str(e)}", None, None, None, None def generate_report(selected_sections: List[str]) -> str: """Generate a report from the current workflow results.""" if not current_workflow: - return "No analysis results available. Please run an analysis first." + return "❌ No analysis results available. Please run an analysis first." try: report = report_generator.generate_report( @@ -1468,229 +3042,502 @@ def create_gradio_interface(): except Exception as e: logger.error(f"Error generating report: {str(e)}") - return f"Error generating report: {str(e)}" + return f"❌ Error generating report: {str(e)}" def get_agent_details(agent_name: str) -> str: """Get detailed information about a specific agent.""" if not coordinator or agent_name not in coordinator.agents: - return "Agent not found or system not initialized." + return "❌ Agent not found or system not initialized." agent = coordinator.agents[agent_name] + status = agent.get_status_summary() - details = f"""## Agent: {agent.name} + details = f"""## Agent Profile: {agent.name} -**Role:** {agent.role.value} +
-**Status:** {'Active' if agent.active else 'Inactive'} +**Role:** {agent.role.value} +**Status:** {'🟢 Active' if agent.active else '🔴 Inactive'} +**Completed Tasks:** {len(agent.completed_tasks)} +**Current Task:** {agent.current_task.description if agent.current_task else 'None'} +**Average Confidence:** {status['average_confidence']:.0%} +**Collaborations:** {status['collaboration_count']} -**Completed Tasks:** {len(agent.completed_tasks)} +
-**Current Task:** {agent.current_task.description if agent.current_task else 'None'} - -### Recent Tasks: +### Recent Task History """ - for task in agent.completed_tasks[-5:]: - details += f"\n- **{task.id}**: {task.description}" - details += f"\n - Status: {task.status.value}" - details += f"\n - Confidence: {task.confidence:.0%}" - details += f"\n - Execution Time: {(task.completed_at - task.created_at).total_seconds():.1f}s\n" + for i, task in enumerate(agent.completed_tasks[-5:], 1): + status_icon = "✅" if task.status == TaskStatus.COMPLETED else "❌" + exec_time = task.performance_metrics.get('execution_time', 0) + + details += f""" +**{i}. {status_icon} {task.id}** +- Description: {task.description} +- Confidence: {task.confidence:.0%} +- Execution Time: {exec_time:.1f}s +""" + + # Add performance insights + if agent.performance_tracker.metrics['task_completion_times']: + avg_time = np.mean(agent.performance_tracker.metrics['task_completion_times']) + details += f"\n### Performance Statistics\n" + details += f"- Average Task Time: {avg_time:.1f}s\n" + details += f"- Total Active Time: {sum(agent.performance_tracker.metrics['task_completion_times']):.1f}s\n" return details + def get_workflow_insights() -> str: + """Get insights about the multi-agent system performance.""" + + if not coordinator: + return "❌ System not initialized." + + insights = coordinator.get_workflow_insights() + + if insights.get('total_workflows_executed', 0) == 0: + return "📊 No workflow executions yet. Run an analysis to see performance insights." + + content = f"""## Workflow Insights + +
+ +### System Performance Overview + +- **Total Workflows:** {insights['total_workflows_executed']} +- **Average Execution Time:** {insights['average_execution_time']:.1f}s +- **Average Success Rate:** {insights['average_success_rate']:.0%} +- **Most Efficient Agent:** {insights.get('most_efficient_agent', 'N/A')} +- **Highest Quality Agent:** {insights.get('highest_quality_agent', 'N/A')} + +
+ +### Agent Efficiency Rankings +""" + + if insights.get('agent_efficiency'): + content += "\n| Agent | Avg Time/Task | Avg Confidence | Total Tasks |\n" + content += "|-------|---------------|----------------|-------------|\n" + + for agent, efficiency in insights['agent_efficiency'].items(): + content += f"| {agent} | {efficiency['avg_time_per_task']:.1f}s | " + content += f"{efficiency['avg_confidence']:.0%} | {efficiency['total_tasks']} |\n" + + return content + # Create custom CSS for professional styling custom_css = """ .gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', sans-serif; + max-width: 1400px; + margin: 0 auto; + } + + .gr-button { + font-weight: 600; + transition: all 0.3s ease; } .gr-button-primary { - background-color: #2563eb !important; - border-color: #2563eb !important; + background-color: #3498db !important; + border-color: #3498db !important; } .gr-button-primary:hover { - background-color: #1d4ed8 !important; - border-color: #1d4ed8 !important; + background-color: #2980b9 !important; + border-color: #2980b9 !important; + transform: translateY(-1px); + box-shadow: 0 4px 12px rgba(52, 152, 219, 0.3); } - .container { - max-width: 1200px; - margin: 0 auto; + .gr-button-secondary { + background-color: #95a5a6 !important; + border-color: #95a5a6 !important; } h1 { - background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); - -webkit-background-clip: text; - -webkit-text-fill-color: transparent; - text-align: center; - font-size: 2.5rem; + color: #2c3e50; font-weight: 800; - margin-bottom: 1rem; + margin-bottom: 0.5rem; } h2 { - color: #1f2937; - font-size: 1.5rem; + color: #34495e; font-weight: 600; - margin-top: 2rem; - margin-bottom: 1rem; + margin-top: 1.5rem; + } + + h3 { + color: #34495e; + font-weight: 600; + } + + .gr-box { + border-radius: 8px; + border-color: #e0e0e0; + } + + .gr-padded { + padding: 20px; } - .status-box { - background-color: #f3f4f6; + .gr-form { + background-color: #f8f9fa; border-radius: 8px; - padding: 1rem; - margin: 1rem 0; - border-left: 4px solid #3b82f6; + padding: 20px; + border: 1px solid #e0e0e0; + } + + .status-text { + font-size: 14px; + line-height: 1.6; + } + + .markdown-text { + line-height: 1.6; + color: #2c3e50; + } + + .markdown-text h1, .markdown-text h2, .markdown-text h3 { + margin-top: 1.5em; + margin-bottom: 0.75em; + } + + .markdown-text ul, .markdown-text ol { + margin-left: 1.5em; + margin-bottom: 1em; + } + + .markdown-text code { + background-color: #f0f0f0; + padding: 2px 4px; + border-radius: 3px; + font-size: 0.9em; + } + + .markdown-text pre { + background-color: #f0f0f0; + padding: 10px; + border-radius: 5px; + overflow-x: auto; + } + + .tab-nav { + background-color: #f8f9fa; + border-bottom: 2px solid #e0e0e0; + } + + /* Loading animation */ + @keyframes pulse { + 0% { opacity: 0.6; } + 50% { opacity: 1; } + 100% { opacity: 0.6; } + } + + .loading { + animation: pulse 1.5s ease-in-out infinite; } """ # Create Gradio interface - with gr.Blocks(title="Multi-Agent AI Collaboration System", - theme=gr.themes.Base(), - css=custom_css) as interface: - - gr.Markdown(""" - # Multi-Agent AI Collaboration System + with gr.Blocks( + title="Multi-Agent AI Collaboration System", + theme=gr.themes.Base(), + css=custom_css + ) as interface: - Advanced AI system with specialized agents working together to solve complex problems through intelligent task decomposition and parallel processing. + # Header + gr.HTML(""" +
+

+ Multi-Agent AI Collaboration System +

+

+ Advanced AI agents working together to solve complex problems through intelligent task decomposition and parallel processing +

+
""") # System Configuration - with gr.Row(): - with gr.Column(scale=3): - api_key_input = gr.Textbox( - label="OpenAI API Key", - placeholder="sk-...", - type="password" - ) - with gr.Column(scale=1): - model_select = gr.Dropdown( - choices=["gpt-4", "gpt-3.5-turbo"], - value="gpt-4", - label="Model" - ) - with gr.Column(scale=1): - init_button = gr.Button("Initialize Agents", variant="primary") - - init_status = gr.Textbox(label="Initialization Status", interactive=False) + with gr.Group(): + gr.Markdown("### 🔧 System Configuration") + + with gr.Row(): + with gr.Column(scale=3): + api_key_input = gr.Textbox( + label="OpenAI API Key", + placeholder="sk-...", + type="password", + info="Required for live mode. Leave empty for demo mode." + ) + with gr.Column(scale=1): + model_select = gr.Dropdown( + choices=["gpt-4", "gpt-3.5-turbo"], + value="gpt-4", + label="Model", + info="Select the LLM model" + ) + with gr.Column(scale=1): + demo_mode_checkbox = gr.Checkbox( + label="Demo Mode", + value=False, + info="Run without API key" + ) + with gr.Column(scale=1): + init_button = gr.Button( + "Initialize Agents", + variant="primary", + size="lg" + ) + + init_status = gr.Textbox( + label="Initialization Status", + interactive=False, + elem_classes=["status-text"] + ) # Main tabs - with gr.Tabs(): + with gr.Tabs() as tabs: # Problem Analysis Tab - with gr.TabItem("Problem Analysis"): - gr.Markdown("### Enter a complex problem for multi-agent analysis") - - problem_input = gr.Textbox( - label="Problem Statement", - placeholder="Example: Analyze the potential impact of AI on healthcare delivery in the next 5 years", - lines=3 - ) - - with gr.Row(): - execution_mode = gr.Radio( - choices=["Sequential", "Parallel"], - value="Parallel", - label="Execution Mode" + with gr.TabItem("🔍 Problem Analysis", id=1): + with gr.Group(): + gr.Markdown("### Enter a complex problem for multi-agent analysis") + + problem_input = gr.Textbox( + label="Problem Statement", + placeholder="Example: Analyze the potential impact of AI on healthcare delivery in the next 5 years", + lines=3, + info="Describe a complex problem that requires multiple perspectives" + ) + + with gr.Row(): + with gr.Column(scale=1): + execution_mode = gr.Radio( + choices=["Sequential", "Parallel"], + value="Parallel", + label="Execution Mode", + info="Parallel mode is faster but requires more resources" + ) + with gr.Column(scale=1): + use_template = gr.Checkbox( + label="Use Workflow Template", + value=True, + info="Automatically match to predefined workflows" + ) + with gr.Column(scale=2): + analyze_button = gr.Button( + "🚀 Analyze Problem", + variant="primary", + size="lg" + ) + + analysis_status = gr.Textbox( + label="Analysis Status", + interactive=False, + lines=8, + elem_classes=["status-text"] ) - analyze_button = gr.Button("Analyze Problem", variant="primary") - - analysis_status = gr.Textbox( - label="Analysis Status", - interactive=False, - lines=5 - ) # Visualization outputs - with gr.Row(): - workflow_graph = gr.Plot(label="Agent Collaboration Network") - - with gr.Row(): - timeline_chart = gr.Plot(label="Task Execution Timeline") - confidence_heatmap = gr.Plot(label="Agent Performance Metrics") + with gr.Group(): + gr.Markdown("### 📊 Analysis Visualizations") + + with gr.Row(): + workflow_graph = gr.Plot(label="Agent Collaboration Network") + + with gr.Row(): + with gr.Column(): + timeline_chart = gr.Plot(label="Task Execution Timeline") + with gr.Column(): + confidence_heatmap = gr.Plot(label="Agent Performance Metrics") + + with gr.Row(): + performance_chart = gr.Plot(label="Performance Comparison") # Agent Details Tab - with gr.TabItem("Agent Details"): - gr.Markdown("### View detailed information about each agent") - - agent_selector = gr.Dropdown( - choices=["Researcher-1", "Analyst-1", "Critic-1", "Synthesizer-1"], - label="Select Agent" - ) - - agent_details_button = gr.Button("Get Agent Details") + with gr.TabItem("🤖 Agent Details", id=2): + with gr.Group(): + gr.Markdown("### View detailed information about each agent") + + with gr.Row(): + agent_selector = gr.Dropdown( + choices=["Researcher-1", "Analyst-1", "Critic-1", "Synthesizer-1"], + label="Select Agent", + info="Choose an agent to view their profile and performance" + ) + agent_details_button = gr.Button( + "Get Agent Details", + variant="secondary" + ) + + agent_details_output = gr.Markdown( + elem_classes=["markdown-text"] + ) - agent_details_output = gr.Markdown() + with gr.Group(): + gr.Markdown("### 📈 System Insights") + + insights_button = gr.Button( + "Get Workflow Insights", + variant="secondary" + ) + + insights_output = gr.Markdown( + elem_classes=["markdown-text"] + ) # Report Generation Tab - with gr.TabItem("Report Generation"): - gr.Markdown("### Generate comprehensive analysis report") + with gr.TabItem("📄 Report Generation", id=3): + with gr.Group(): + gr.Markdown("### Generate comprehensive analysis report") + + section_selector = gr.CheckboxGroup( + choices=[ + "executive_summary", + "task_analysis", + "agent_contributions", + "key_findings", + "recommendations", + "confidence_analysis", + "performance_metrics" + ], + value=[ + "executive_summary", + "key_findings", + "recommendations", + "confidence_analysis" + ], + label="Select Report Sections", + info="Choose which sections to include in the report" + ) + + generate_report_button = gr.Button( + "📄 Generate Report", + variant="primary", + size="lg" + ) + + report_output = gr.Markdown( + elem_classes=["markdown-text"] + ) + + # Example Problems Tab + with gr.TabItem("💡 Example Problems", id=4): + gr.Markdown(""" + ### Example Problems for Analysis - section_selector = gr.CheckboxGroup( - choices=[ - "executive_summary", - "task_analysis", - "agent_contributions", - "key_findings", - "recommendations", - "confidence_analysis" - ], - value=[ - "executive_summary", - "key_findings", - "recommendations" - ], - label="Select Report Sections" - ) + Click on any example to load it into the analysis tab. These examples demonstrate different types of complex problems suitable for multi-agent analysis. + """) - generate_report_button = gr.Button("Generate Report", variant="primary") + example_problems = [ + { + "title": "🏢 Business Strategy", + "problem": "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty and managing existing physical stores", + "description": "Complex business transformation requiring market analysis, risk assessment, and strategic planning" + }, + { + "title": "🔗 Technology Assessment", + "problem": "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management for a global manufacturing company", + "description": "Technical evaluation requiring understanding of emerging technology and business operations" + }, + { + "title": "🚗 Market Analysis", + "problem": "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants in the North American market", + "description": "Market research requiring industry analysis, competitor assessment, and trend identification" + }, + { + "title": "🏠 Policy Evaluation", + "problem": "Assess the implications of remote work policies on organizational culture, productivity, and talent retention in technology companies", + "description": "Organizational analysis requiring understanding of human resources, culture, and productivity metrics" + }, + { + "title": "🏥 Innovation Planning", + "problem": "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools while ensuring patient privacy and regulatory compliance", + "description": "Innovation strategy requiring technical, regulatory, and ethical considerations" + } + ] - report_output = gr.Markdown() + example_buttons = [] + for i, example in enumerate(example_problems): + with gr.Group(): + gr.Markdown(f""" + #### {example['title']} + *{example['description']}* + """) + btn = gr.Button( + f"Load This Example", + variant="secondary", + size="sm" + ) + example_buttons.append((btn, example['problem'])) - # Example Problems Tab - with gr.TabItem("Example Problems"): + # Help Tab + with gr.TabItem("❓ Help", id=5): gr.Markdown(""" - ### Example Problems for Analysis + ## How to Use the Multi-Agent AI Collaboration System - Click on any example to load it into the analysis tab: + ### 🚀 Getting Started - 1. **Business Strategy**: "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty" + 1. **Initialize the System** + - Enter your OpenAI API key for live analysis (optional) + - Or enable Demo Mode to explore without an API key + - Select your preferred model (GPT-4 recommended) + - Click "Initialize Agents" - 2. **Technology Assessment**: "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management" + 2. **Analyze a Problem** + - Enter a complex problem in the Problem Analysis tab + - Choose execution mode (Parallel is faster) + - Optionally use workflow templates for common problem types + - Click "Analyze Problem" - 3. **Market Analysis**: "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants" + 3. **Review Results** + - View the agent collaboration network + - Check the task execution timeline + - Review performance metrics + - Explore individual agent details - 4. **Policy Evaluation**: "Assess the implications of remote work policies on organizational culture and productivity" + 4. **Generate Report** + - Select desired report sections + - Click "Generate Report" + - Copy or save the comprehensive analysis - 5. **Innovation Planning**: "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools" - """) + ### 🤖 Understanding the Agents - example_buttons = [] - example_problems = [ - "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty", - "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management", - "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants", - "Assess the implications of remote work policies on organizational culture and productivity", - "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools" - ] + - **Researcher**: Gathers information and identifies key facts + - **Analyst**: Processes data and identifies patterns + - **Critic**: Evaluates quality and identifies gaps + - **Synthesizer**: Combines insights into actionable recommendations + - **Coordinator**: Manages workflow and facilitates collaboration + + ### 💡 Tips for Best Results + + - Be specific and detailed in your problem statements + - Complex, multi-faceted problems work best + - Use parallel execution for faster results + - Review agent details to understand the analysis process + - Generate reports for comprehensive documentation - for i, problem in enumerate(example_problems): - btn = gr.Button(f"Load Example {i+1}", size="sm") - example_buttons.append(btn) + ### 🔧 Troubleshooting + + - **Initialization fails**: Check your API key or enable Demo Mode + - **Analysis takes too long**: Try Sequential mode or simpler problems + - **Empty visualizations**: Ensure analysis completed successfully + - **Report generation fails**: Check that analysis was completed first + """) # Event handlers init_button.click( fn=initialize_agents, - inputs=[api_key_input, model_select], + inputs=[api_key_input, model_select, demo_mode_checkbox], outputs=init_status ) analyze_button.click( - fn=lambda p, m: asyncio.run(analyze_problem(p, m)), - inputs=[problem_input, execution_mode], - outputs=[analysis_status, workflow_graph, timeline_chart, confidence_heatmap] + fn=lambda p, m, t: asyncio.run(analyze_problem(p, m, t)), + inputs=[problem_input, execution_mode, use_template], + outputs=[analysis_status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart] ) agent_details_button.click( @@ -1699,6 +3546,12 @@ def create_gradio_interface(): outputs=agent_details_output ) + insights_button.click( + fn=get_workflow_insights, + inputs=[], + outputs=insights_output + ) + generate_report_button.click( fn=generate_report, inputs=section_selector, @@ -1706,10 +3559,13 @@ def create_gradio_interface(): ) # Example button handlers - for i, btn in enumerate(example_buttons): + for btn, problem in example_buttons: btn.click( - fn=lambda idx=i: example_problems[idx], + fn=lambda p=problem: p, outputs=problem_input + ).then( + fn=lambda: gr.Tabs.update(selected=1), + outputs=tabs ) return interface @@ -1717,4 +3573,8 @@ def create_gradio_interface(): # Main execution if __name__ == "__main__": interface = create_gradio_interface() - interface.launch(share=True) \ No newline at end of file + interface.launch( + share=True, + favicon_path=None, + show_error=True + ) \ No newline at end of file