diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,10 +1,10 @@ # Multi-Agent AI Collaboration System # Author: Spencer Purdy # Description: Enterprise-grade multi-agent system with specialized AI agents collaborating -# to solve complex problems through intelligent task decomposition and parallel processing. +# to solve complex problems through task decomposition and parallel processing. # Installation (uncomment for Google Colab) -# !pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic scipy +# !pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic import os import json @@ -27,7 +27,6 @@ import networkx as nx import matplotlib.pyplot as plt from matplotlib.patches import FancyBboxPatch import plotly.graph_objects as go -import plotly.express as px from plotly.subplots import make_subplots # LangChain and AI libraries @@ -40,7 +39,7 @@ from pydantic import BaseModel, Field # Async libraries import aiohttp -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor # Configure logging logging.basicConfig( @@ -65,33 +64,15 @@ class Config: GRAPH_UPDATE_INTERVAL = 0.5 # seconds NODE_COLORS = { 'Researcher': '#3498db', - 'Analyst': '#e74c3c', + 'Analyst': '#e74c3c', 'Critic': '#f39c12', 'Synthesizer': '#2ecc71', 'Coordinator': '#9b59b6' } - # Performance settings - ENABLE_PERFORMANCE_TRACKING = True - BENCHMARK_BASELINE = { - "single_agent_time": 45.0, - "single_agent_quality": 0.72 - } - # Report settings CONFIDENCE_THRESHOLD = 0.7 MAX_REPORT_SECTIONS = 10 - COMPANY_NAME = "Multi-Agent AI Platform" - - # Demo settings - DEMO_MODE_ENABLED = True - DEMO_PROBLEMS = [ - "Analyze the impact of remote work on team productivity and collaboration", - "Develop a strategy for sustainable urban transportation", - "Evaluate the risks and benefits of AI in healthcare", - "Design a framework for ethical AI development", - "Create a plan for digital transformation in education" - ] class AgentRole(Enum): """Enumeration of agent roles in the system.""" @@ -108,14 +89,6 @@ class TaskStatus(Enum): COMPLETED = "completed" FAILED = "failed" -class MessageType(Enum): - """Types of messages between agents.""" - TASK_ASSIGNMENT = "task_assignment" - COLLABORATION_REQUEST = "collaboration_request" - INFORMATION_SHARING = "information_sharing" - FEEDBACK = "feedback" - COMPLETION_REPORT = "completion_report" - @dataclass class Task: """Represents a task to be executed by agents.""" @@ -129,7 +102,6 @@ class Task: created_at: datetime = field(default_factory=datetime.now) completed_at: Optional[datetime] = None metadata: Dict[str, Any] = field(default_factory=dict) - performance_metrics: Dict[str, float] = field(default_factory=dict) @dataclass class AgentMessage: @@ -137,102 +109,9 @@ class AgentMessage: sender: str recipient: str content: str - message_type: MessageType + message_type: str = "task" metadata: Dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) - priority: int = 1 # 1 (low) to 5 (high) - -class PerformanceTracker: - """Tracks performance metrics for the multi-agent system.""" - - def __init__(self): - self.metrics = { - 'task_completion_times': [], - 'agent_utilization': {}, - 'collaboration_count': 0, - 'total_messages': 0, - 'quality_scores': [], - 'system_start_time': None, - 'system_end_time': None - } - - def start_tracking(self): - """Start performance tracking.""" - self.metrics['system_start_time'] = datetime.now() - - def end_tracking(self): - """End performance tracking.""" - self.metrics['system_end_time'] = datetime.now() - - def record_task_completion(self, task: Task): - """Record task completion metrics.""" - if task.created_at and task.completed_at: - completion_time = (task.completed_at - task.created_at).total_seconds() - self.metrics['task_completion_times'].append(completion_time) - - def record_agent_activity(self, agent_name: str, activity_duration: float): - """Record agent activity duration.""" - if agent_name not in self.metrics['agent_utilization']: - self.metrics['agent_utilization'][agent_name] = 0 - self.metrics['agent_utilization'][agent_name] += activity_duration - - def record_collaboration(self): - """Record a collaboration event.""" - self.metrics['collaboration_count'] += 1 - - def record_message(self): - """Record a message exchange.""" - self.metrics['total_messages'] += 1 - - def get_performance_summary(self) -> Dict[str, Any]: - """Get performance summary statistics.""" - total_time = 0 - if self.metrics['system_start_time'] and self.metrics['system_end_time']: - total_time = (self.metrics['system_end_time'] - - self.metrics['system_start_time']).total_seconds() - - avg_task_time = np.mean(self.metrics['task_completion_times']) if self.metrics['task_completion_times'] else 0 - - # Calculate improvement over baseline - baseline_time = Config.BENCHMARK_BASELINE['single_agent_time'] - time_improvement = ((baseline_time - avg_task_time) / baseline_time * 100) if avg_task_time > 0 else 0 - - return { - 'total_execution_time': total_time, - 'average_task_completion_time': avg_task_time, - 'total_collaborations': self.metrics['collaboration_count'], - 'total_messages': self.metrics['total_messages'], - 'agent_utilization': self.metrics['agent_utilization'], - 'time_improvement_percentage': time_improvement, - 'efficiency_score': self._calculate_efficiency_score() - } - - def _calculate_efficiency_score(self) -> float: - """Calculate overall efficiency score.""" - # Simple efficiency calculation based on multiple factors - factors = [] - - # Task completion speed factor - if self.metrics['task_completion_times']: - avg_time = np.mean(self.metrics['task_completion_times']) - speed_factor = min(1.0, Config.BENCHMARK_BASELINE['single_agent_time'] / avg_time) - factors.append(speed_factor) - - # Collaboration efficiency factor - if self.metrics['total_messages'] > 0: - collab_factor = min(1.0, self.metrics['collaboration_count'] / self.metrics['total_messages']) - factors.append(collab_factor) - - # Agent utilization factor - if self.metrics['agent_utilization']: - utilization_values = list(self.metrics['agent_utilization'].values()) - if utilization_values: - avg_utilization = np.mean(utilization_values) - max_utilization = max(utilization_values) - balance_factor = avg_utilization / max_utilization if max_utilization > 0 else 0 - factors.append(balance_factor) - - return np.mean(factors) if factors else 0.5 class AgentMemory: """Manages agent conversation history and context.""" @@ -241,16 +120,12 @@ class AgentMemory: self.messages: List[AgentMessage] = [] self.max_messages = max_messages self.context: Dict[str, Any] = {} - self.knowledge_base: Dict[str, Any] = {} def add_message(self, message: AgentMessage): """Add a message to memory.""" self.messages.append(message) if len(self.messages) > self.max_messages: self.messages.pop(0) - - # Extract and store important information - self._extract_knowledge(message) def get_recent_messages(self, n: int = 10) -> List[AgentMessage]: """Get n most recent messages.""" @@ -260,10 +135,6 @@ class AgentMemory: """Get all messages from a specific sender.""" return [msg for msg in self.messages if msg.sender == sender] - def get_high_priority_messages(self) -> List[AgentMessage]: - """Get high priority messages.""" - return [msg for msg in self.messages if msg.priority >= 4] - def update_context(self, key: str, value: Any): """Update context information.""" self.context[key] = value @@ -271,27 +142,11 @@ class AgentMemory: def get_context(self, key: str) -> Any: """Get context information.""" return self.context.get(key) - - def _extract_knowledge(self, message: AgentMessage): - """Extract and store important knowledge from messages.""" - # Simple keyword-based knowledge extraction - keywords = ['finding', 'conclusion', 'recommendation', 'insight', 'pattern'] - content_lower = message.content.lower() - - for keyword in keywords: - if keyword in content_lower: - knowledge_key = f"{message.sender}_{keyword}_{len(self.knowledge_base)}" - self.knowledge_base[knowledge_key] = { - 'content': message.content, - 'sender': message.sender, - 'timestamp': message.timestamp, - 'type': keyword - } class BaseAgent: """Base class for all AI agents in the system.""" - def __init__(self, name: str, role: AgentRole, llm: Optional[ChatOpenAI] = None): + def __init__(self, name: str, role: AgentRole, llm: ChatOpenAI): self.name = name self.role = role self.llm = llm @@ -299,8 +154,6 @@ class BaseAgent: self.active = True self.current_task: Optional[Task] = None self.completed_tasks: List[Task] = [] - self.performance_tracker = PerformanceTracker() - self.collaboration_partners: Set[str] = set() async def process_task(self, task: Task) -> Task: """Process a task and return the result.""" @@ -308,29 +161,16 @@ class BaseAgent: task.status = TaskStatus.IN_PROGRESS task.assigned_to = self.name - # Record start time - start_time = datetime.now() - try: # Execute task based on agent role - if self.llm: - result = await self._execute_task(task) - else: - # Demo mode - simulate execution - result = await self._simulate_task_execution(task) + result = await self._execute_task(task) task.result = result task.status = TaskStatus.COMPLETED task.completed_at = datetime.now() task.confidence = self._calculate_confidence(result) - # Record performance metrics - execution_time = (task.completed_at - start_time).total_seconds() - task.performance_metrics['execution_time'] = execution_time - task.performance_metrics['confidence'] = task.confidence - self.completed_tasks.append(task) - self.performance_tracker.record_task_completion(task) except Exception as e: logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}") @@ -347,115 +187,42 @@ class BaseAgent: """Execute the task - to be implemented by subclasses.""" raise NotImplementedError("Subclasses must implement _execute_task") - async def _simulate_task_execution(self, task: Task) -> Any: - """Simulate task execution for demo mode.""" - # Simulate processing time - await asyncio.sleep(np.random.uniform(1, 3)) - - # Generate simulated result based on role - simulation_templates = { - AgentRole.RESEARCHER: { - "findings": f"Research on '{task.description}' reveals multiple perspectives and data points.", - "sources": ["Academic studies", "Industry reports", "Expert interviews"], - "key_points": [ - "Significant trends identified in the domain", - "Multiple stakeholder perspectives considered", - "Historical context provides important insights" - ] - }, - AgentRole.ANALYST: { - "analysis": f"Analysis of '{task.description}' shows clear patterns and relationships.", - "patterns": [{"description": "Trend identified", "type": "trend"}], - "insights": ["Data suggests strong correlations", "Multiple factors influence outcomes"], - "confidence_metrics": {"overall_confidence": 0.85} - }, - AgentRole.CRITIC: { - "evaluation": f"Critical evaluation of '{task.description}' identifies strengths and areas for improvement.", - "strengths": ["Comprehensive approach", "Well-structured analysis"], - "gaps": ["Additional data needed", "Alternative perspectives to consider"], - "quality_score": {"overall": 0.78} - }, - AgentRole.SYNTHESIZER: { - "synthesis": f"Synthesis of findings on '{task.description}' provides integrated insights.", - "key_themes": [{"theme": "Theme 1", "description": "Central finding"}], - "final_recommendations": ["Strategic recommendation 1", "Strategic recommendation 2"], - "executive_summary": "Comprehensive analysis reveals actionable insights." - } - } - - return simulation_templates.get(self.role, {"result": "Task completed"}) - def _calculate_confidence(self, result: Any) -> float: """Calculate confidence score for the result.""" - # Enhanced confidence calculation - base_confidence = 0.5 - - if result and isinstance(result, dict): - # Check for quality indicators - if 'confidence_metrics' in result: - return result['confidence_metrics'].get('overall_confidence', base_confidence) - - # Calculate based on result completeness - expected_keys = {'findings', 'analysis', 'evaluation', 'synthesis'} - actual_keys = set(result.keys()) - completeness = len(actual_keys.intersection(expected_keys)) / len(expected_keys) - - # Calculate based on content length - content_length = sum(len(str(v)) for v in result.values() if isinstance(v, (str, list))) - length_factor = min(1.0, content_length / 500) - - confidence = base_confidence + (completeness * 0.3) + (length_factor * 0.2) - return min(0.95, confidence) - - return base_confidence + # Basic confidence calculation - can be overridden by subclasses + if result and isinstance(result, str) and len(result) > 50: + return min(0.9, 0.5 + len(result) / 1000) + return 0.5 - async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage) -> AgentMessage: + async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage): """Handle collaboration with another agent.""" self.memory.add_message(message) - self.collaboration_partners.add(other_agent.name) - self.performance_tracker.record_collaboration() # Process collaboration request - response_content = await self._process_collaboration(message) + response = await self._process_collaboration(message) - # Create response message + # Send response back response_message = AgentMessage( sender=self.name, recipient=other_agent.name, - content=response_content, - message_type=MessageType.INFORMATION_SHARING, - priority=message.priority + content=response, + message_type="response" ) other_agent.memory.add_message(response_message) - self.performance_tracker.record_message() return response_message async def _process_collaboration(self, message: AgentMessage) -> str: """Process collaboration message - to be implemented by subclasses.""" - return f"{self.name} acknowledges collaboration request from {message.sender}" - - def get_status_summary(self) -> Dict[str, Any]: - """Get current status summary of the agent.""" - return { - 'name': self.name, - 'role': self.role.value, - 'active': self.active, - 'current_task': self.current_task.description if self.current_task else None, - 'completed_tasks': len(self.completed_tasks), - 'average_confidence': np.mean([t.confidence for t in self.completed_tasks]) if self.completed_tasks else 0, - 'collaboration_count': len(self.collaboration_partners), - 'memory_size': len(self.memory.messages) - } + return f"Acknowledged message from {message.sender}" class ResearcherAgent(BaseAgent): """Agent specialized in researching and gathering information.""" - def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): + def __init__(self, name: str, llm: ChatOpenAI): super().__init__(name, AgentRole.RESEARCHER, llm) self.research_sources: List[str] = [] - self.research_methods = ["literature_review", "data_collection", "expert_consultation", "field_research"] async def _execute_task(self, task: Task) -> Any: """Execute research task.""" @@ -465,119 +232,68 @@ class ResearcherAgent(BaseAgent): 1. Break down complex topics into research questions 2. Identify key information sources and data points 3. Provide detailed, factual information with citations where possible - 4. Flag areas requiring further investigation - 5. Maintain objectivity and consider multiple perspectives"""), + 4. Flag areas requiring further investigation"""), HumanMessage(content=f"Research the following: {task.description}") ]) response = await self.llm.ainvoke(prompt.format_messages()) - # Extract and structure research findings + # Extract research findings research_result = { "findings": response.content, "sources": self._extract_sources(response.content), "key_points": self._extract_key_points(response.content), - "areas_for_investigation": self._identify_gaps(response.content), - "research_quality_score": self._assess_research_quality(response.content) + "areas_for_investigation": self._identify_gaps(response.content) } - # Update internal knowledge - self.memory.update_context('latest_research', research_result) - return research_result def _extract_sources(self, content: str) -> List[str]: """Extract potential sources from research content.""" + # Simplified source extraction sources = [] - source_indicators = ['source:', 'reference:', 'based on:', 'according to', 'study:', 'report:'] - lines = content.split('\n') for line in lines: - line_lower = line.lower() - for indicator in source_indicators: - if indicator in line_lower: - sources.append(line.strip()) - break - - return sources[:10] # Limit to top 10 sources + if any(keyword in line.lower() for keyword in ['source:', 'reference:', 'based on:', 'according to']): + sources.append(line.strip()) + return sources[:5] # Limit to top 5 sources def _extract_key_points(self, content: str) -> List[str]: """Extract key points from research.""" key_points = [] lines = content.split('\n') - for line in lines: - line = line.strip() - # Check for numbered or bulleted points - if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')): - key_points.append(line) - # Check for key phrases - elif any(phrase in line.lower() for phrase in ['key finding:', 'important:', 'notably:']): - key_points.append(line) - - return key_points[:15] # Limit to top 15 points + if line.strip() and (line.strip()[0].isdigit() or line.strip().startswith('-')): + key_points.append(line.strip()) + return key_points[:10] # Limit to top 10 points def _identify_gaps(self, content: str) -> List[str]: """Identify areas needing more research.""" gaps = [] - gap_indicators = ['unclear', 'requires further', 'need more', 'investigate', - 'unknown', 'limited data', 'insufficient evidence'] - - sentences = content.split('.') - for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in gap_indicators): - gaps.append(sentence.strip() + '.') - + keywords = ['unclear', 'requires further', 'need more', 'investigate', 'unknown'] + lines = content.split('\n') + for line in lines: + if any(keyword in line.lower() for keyword in keywords): + gaps.append(line.strip()) return gaps[:5] - - def _assess_research_quality(self, content: str) -> float: - """Assess the quality of research output.""" - quality_score = 0.5 # Base score - - # Check for sources - if self._extract_sources(content): - quality_score += 0.15 - - # Check for structured content - if self._extract_key_points(content): - quality_score += 0.15 - - # Check for comprehensive coverage - word_count = len(content.split()) - if word_count > 300: - quality_score += 0.1 - - # Check for analytical depth - analytical_terms = ['analysis', 'evaluation', 'comparison', 'contrast', 'implication'] - if any(term in content.lower() for term in analytical_terms): - quality_score += 0.1 - - return min(1.0, quality_score) class AnalystAgent(BaseAgent): """Agent specialized in analyzing data and identifying patterns.""" - def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): + def __init__(self, name: str, llm: ChatOpenAI): super().__init__(name, AgentRole.ANALYST, llm) - self.analysis_methods = ["statistical", "comparative", "trend", "causal", "predictive"] - self.analysis_frameworks = ["SWOT", "PESTLE", "Porter's Five Forces", "Cost-Benefit"] + self.analysis_methods: List[str] = ["statistical", "comparative", "trend", "causal"] async def _execute_task(self, task: Task) -> Any: """Execute analysis task.""" - # Get context from previous research if available - context = self._get_relevant_context(task) - prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are an Analyst Agent specializing in data analysis and pattern recognition. Your role is to: 1. Analyze information systematically and objectively 2. Identify patterns, trends, and correlations 3. Provide quantitative insights where possible - 4. Draw logical conclusions based on evidence - 5. Apply appropriate analytical frameworks - 6. Consider multiple analytical perspectives"""), - HumanMessage(content=f"Analyze the following: {task.description}\n\nContext: {context}") + 4. Draw logical conclusions based on evidence"""), + HumanMessage(content=f"Analyze the following: {task.description}") ]) response = await self.llm.ainvoke(prompt.format_messages()) @@ -588,158 +304,88 @@ class AnalystAgent(BaseAgent): "patterns": self._identify_patterns(response.content), "insights": self._extract_insights(response.content), "recommendations": self._generate_recommendations(response.content), - "confidence_metrics": self._calculate_analysis_confidence(response.content), - "analytical_framework": self._identify_framework_used(response.content) + "confidence_metrics": self._calculate_analysis_confidence(response.content) } - # Store analysis in memory - self.memory.update_context('latest_analysis', analysis_result) - return analysis_result - def _get_relevant_context(self, task: Task) -> str: - """Get relevant context from memory for the task.""" - context_items = [] - - # Get recent messages related to the task - recent_messages = self.memory.get_recent_messages(5) - for msg in recent_messages: - if task.description.lower() in msg.content.lower(): - context_items.append(f"Previous finding: {msg.content[:200]}...") - - # Get knowledge base items - for key, knowledge in self.memory.knowledge_base.items(): - if 'finding' in knowledge['type'] or 'insight' in knowledge['type']: - context_items.append(f"Known insight: {knowledge['content'][:200]}...") - - return "\n".join(context_items[:3]) # Limit context items - def _identify_patterns(self, content: str) -> List[Dict[str, str]]: """Identify patterns in the analysis.""" patterns = [] - pattern_types = { - 'trend': ['trend', 'increasing', 'decreasing', 'growth', 'decline'], - 'correlation': ['correlation', 'relationship', 'associated', 'linked'], - 'cyclical': ['cycle', 'periodic', 'seasonal', 'recurring'], - 'anomaly': ['anomaly', 'outlier', 'unusual', 'exceptional'] - } + pattern_keywords = ['pattern', 'trend', 'correlation', 'relationship', 'consistent'] - sentences = content.split('.') - for sentence in sentences: - sentence_lower = sentence.lower() - for pattern_type, keywords in pattern_types.items(): - if any(keyword in sentence_lower for keyword in keywords): - patterns.append({ - "description": sentence.strip() + '.', - "type": pattern_type, - "confidence": 0.8 - }) - break + lines = content.split('\n') + for line in lines: + if any(keyword in line.lower() for keyword in pattern_keywords): + patterns.append({ + "description": line.strip(), + "type": self._classify_pattern(line) + }) - return patterns[:8] + return patterns[:5] + + def _classify_pattern(self, description: str) -> str: + """Classify the type of pattern.""" + description_lower = description.lower() + if 'trend' in description_lower: + return 'trend' + elif 'correlation' in description_lower: + return 'correlation' + elif 'cycle' in description_lower or 'periodic' in description_lower: + return 'cyclical' + else: + return 'general' def _extract_insights(self, content: str) -> List[str]: """Extract key insights from analysis.""" insights = [] - insight_indicators = ['shows', 'indicates', 'suggests', 'reveals', - 'demonstrates', 'implies', 'means that', 'therefore'] + insight_keywords = ['shows', 'indicates', 'suggests', 'reveals', 'demonstrates'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in insight_indicators): + if any(keyword in sentence.lower() for keyword in insight_keywords): insights.append(sentence.strip() + '.') - return insights[:10] + return insights[:7] def _generate_recommendations(self, content: str) -> List[str]: """Generate recommendations based on analysis.""" recommendations = [] - rec_indicators = ['recommend', 'suggest', 'should', 'consider', - 'advise', 'propose', 'it would be beneficial'] + rec_keywords = ['recommend', 'suggest', 'should', 'consider', 'advise'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in rec_indicators): + if any(keyword in sentence.lower() for keyword in rec_keywords): recommendations.append(sentence.strip() + '.') - # Prioritize recommendations - prioritized = [] - for rec in recommendations: - priority = "high" if any(word in rec.lower() for word in ['critical', 'essential', 'must']) else "medium" - prioritized.append({"recommendation": rec, "priority": priority}) - - return prioritized[:7] + return recommendations[:5] def _calculate_analysis_confidence(self, content: str) -> Dict[str, float]: """Calculate confidence metrics for the analysis.""" - # Count evidence indicators - evidence_count = sum(content.lower().count(word) for word in ['evidence', 'data', 'shows', 'proves']) - uncertainty_count = sum(content.lower().count(word) for word in ['may', 'might', 'possibly', 'perhaps']) - - # Calculate confidence scores - evidence_strength = min(1.0, evidence_count / 10) - certainty_level = max(0.0, 1.0 - (uncertainty_count / 10)) - - # Check for quantitative analysis - quantitative_indicators = ['percentage', '%', 'ratio', 'correlation', 'statistical'] - quantitative_score = 0.7 if any(ind in content.lower() for ind in quantitative_indicators) else 0.5 + word_count = len(content.split()) + evidence_count = content.lower().count('evidence') + content.lower().count('data') + content.lower().count('shows') + uncertainty_count = content.lower().count('may') + content.lower().count('might') + content.lower().count('possibly') - overall_confidence = (evidence_strength + certainty_level + quantitative_score) / 3 + confidence = min(0.95, 0.5 + (evidence_count * 0.1) - (uncertainty_count * 0.05) + (word_count / 1000)) return { - "overall_confidence": overall_confidence, - "evidence_strength": evidence_strength, - "certainty_level": certainty_level, - "quantitative_score": quantitative_score + "overall_confidence": confidence, + "evidence_strength": min(1.0, evidence_count / 10), + "certainty_level": max(0.0, 1.0 - (uncertainty_count / 10)) } - - def _identify_framework_used(self, content: str) -> Optional[str]: - """Identify which analytical framework was used.""" - content_lower = content.lower() - - for framework in self.analysis_frameworks: - if framework.lower() in content_lower: - return framework - - # Check for implicit framework usage - if all(word in content_lower for word in ['strength', 'weakness', 'opportunity', 'threat']): - return "SWOT" - elif any(word in content_lower for word in ['political', 'economic', 'social', 'technological']): - return "PESTLE" - - return None class CriticAgent(BaseAgent): """Agent specialized in critical evaluation and quality assurance.""" - def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): + def __init__(self, name: str, llm: ChatOpenAI): super().__init__(name, AgentRole.CRITIC, llm) self.evaluation_criteria = [ "accuracy", "completeness", "logic", "evidence", - "clarity", "relevance", "consistency", "objectivity" + "clarity", "relevance", "consistency" ] - self.evaluation_rubric = self._create_evaluation_rubric() - - def _create_evaluation_rubric(self) -> Dict[str, Dict[str, float]]: - """Create evaluation rubric with weighted criteria.""" - return { - "accuracy": {"weight": 0.20, "score": 0.0}, - "completeness": {"weight": 0.15, "score": 0.0}, - "logic": {"weight": 0.15, "score": 0.0}, - "evidence": {"weight": 0.15, "score": 0.0}, - "clarity": {"weight": 0.10, "score": 0.0}, - "relevance": {"weight": 0.10, "score": 0.0}, - "consistency": {"weight": 0.10, "score": 0.0}, - "objectivity": {"weight": 0.05, "score": 0.0} - } async def _execute_task(self, task: Task) -> Any: """Execute critical evaluation task.""" - # Get content to evaluate from context - evaluation_context = self._gather_evaluation_context(task) - prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are a Critic Agent specializing in rigorous evaluation and quality assurance. Your role is to: @@ -747,10 +393,8 @@ class CriticAgent(BaseAgent): 2. Identify weaknesses, gaps, and potential biases 3. Verify logical consistency and evidence quality 4. Suggest improvements and alternative perspectives - 5. Ensure high standards of analysis - 6. Apply systematic evaluation criteria - 7. Provide constructive feedback"""), - HumanMessage(content=f"Critically evaluate the following: {task.description}\n\nContent to evaluate: {evaluation_context}") + 5. Ensure high standards of analysis"""), + HumanMessage(content=f"Critically evaluate the following: {task.description}") ]) response = await self.llm.ainvoke(prompt.format_messages()) @@ -762,260 +406,84 @@ class CriticAgent(BaseAgent): "weaknesses": self._identify_weaknesses(response.content), "gaps": self._identify_gaps(response.content), "improvements": self._suggest_improvements(response.content), - "quality_score": self._calculate_quality_score(response.content), - "alternative_perspectives": self._identify_alternatives(response.content), - "final_verdict": self._generate_verdict(response.content) + "quality_score": self._calculate_quality_score(response.content) } - # Update evaluation history - self.memory.update_context('evaluation_history', critique_result) - return critique_result - def _gather_evaluation_context(self, task: Task) -> str: - """Gather relevant context for evaluation.""" - context_items = [] - - # Get recent analysis and research results - recent_messages = self.memory.get_recent_messages(10) - for msg in recent_messages: - if msg.message_type in [MessageType.COMPLETION_REPORT, MessageType.INFORMATION_SHARING]: - context_items.append(f"{msg.sender}: {msg.content[:300]}...") - - # Get knowledge base insights - for key, knowledge in self.memory.knowledge_base.items(): - if knowledge['type'] in ['finding', 'conclusion', 'insight']: - context_items.append(f"Previous {knowledge['type']}: {knowledge['content'][:200]}...") - - return "\n\n".join(context_items[:5]) - - def _identify_strengths(self, content: str) -> List[Dict[str, str]]: + def _identify_strengths(self, content: str) -> List[str]: """Identify strengths in the evaluated content.""" strengths = [] - strength_indicators = ['strong', 'excellent', 'well', 'good', 'effective', - 'solid', 'robust', 'comprehensive', 'thorough'] + strength_keywords = ['strong', 'excellent', 'well', 'good', 'effective', 'solid'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - for indicator in strength_indicators: - if indicator in sentence_lower: - strengths.append({ - "strength": sentence.strip() + '.', - "category": self._categorize_strength(sentence), - "impact": "high" if any(word in sentence_lower for word in ['very', 'extremely', 'highly']) else "medium" - }) - break - - return strengths[:6] - - def _categorize_strength(self, sentence: str) -> str: - """Categorize the type of strength identified.""" - sentence_lower = sentence.lower() + if any(keyword in sentence.lower() for keyword in strength_keywords): + strengths.append(sentence.strip() + '.') - if any(word in sentence_lower for word in ['method', 'approach', 'framework']): - return "methodology" - elif any(word in sentence_lower for word in ['data', 'evidence', 'support']): - return "evidence" - elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']): - return "reasoning" - elif any(word in sentence_lower for word in ['clear', 'organized', 'structured']): - return "presentation" - else: - return "general" + return strengths[:5] - def _identify_weaknesses(self, content: str) -> List[Dict[str, str]]: + def _identify_weaknesses(self, content: str) -> List[str]: """Identify weaknesses in the evaluated content.""" weaknesses = [] - weakness_indicators = ['weak', 'lack', 'insufficient', 'poor', 'inadequate', - 'missing', 'limited', 'unclear', 'vague'] + weakness_keywords = ['weak', 'lack', 'insufficient', 'poor', 'inadequate', 'missing'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - for indicator in weakness_indicators: - if indicator in sentence_lower: - weaknesses.append({ - "weakness": sentence.strip() + '.', - "severity": self._assess_severity(sentence), - "category": self._categorize_weakness(sentence) - }) - break - - return weaknesses[:6] - - def _assess_severity(self, sentence: str) -> str: - """Assess the severity of a weakness.""" - sentence_lower = sentence.lower() + if any(keyword in sentence.lower() for keyword in weakness_keywords): + weaknesses.append(sentence.strip() + '.') - if any(word in sentence_lower for word in ['critical', 'severe', 'major', 'significant']): - return "high" - elif any(word in sentence_lower for word in ['moderate', 'some', 'partial']): - return "medium" - else: - return "low" - - def _categorize_weakness(self, sentence: str) -> str: - """Categorize the type of weakness identified.""" - sentence_lower = sentence.lower() - - if any(word in sentence_lower for word in ['data', 'evidence', 'support']): - return "evidence" - elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']): - return "reasoning" - elif any(word in sentence_lower for word in ['bias', 'objective', 'neutral']): - return "objectivity" - elif any(word in sentence_lower for word in ['complete', 'comprehensive', 'thorough']): - return "completeness" - else: - return "general" + return weaknesses[:5] def _identify_gaps(self, content: str) -> List[str]: """Identify gaps in the analysis.""" gaps = [] - gap_indicators = ['gap', 'missing', 'overlook', 'fail to', 'does not address', - 'ignores', 'omits', 'neglects'] + gap_keywords = ['gap', 'missing', 'overlook', 'fail to', 'does not address', 'ignores'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in gap_indicators): + if any(keyword in sentence.lower() for keyword in gap_keywords): gaps.append(sentence.strip() + '.') return gaps[:5] - def _suggest_improvements(self, content: str) -> List[Dict[str, str]]: + def _suggest_improvements(self, content: str) -> List[str]: """Suggest improvements based on critique.""" improvements = [] - improvement_indicators = ['could', 'should', 'improve', 'enhance', - 'strengthen', 'add', 'consider', 'recommend'] + improvement_keywords = ['could', 'should', 'improve', 'enhance', 'strengthen', 'add'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in improvement_indicators): - improvements.append({ - "suggestion": sentence.strip() + '.', - "priority": self._prioritize_improvement(sentence), - "effort": self._estimate_effort(sentence) - }) - - return improvements[:7] - - def _prioritize_improvement(self, sentence: str) -> str: - """Prioritize improvement suggestions.""" - sentence_lower = sentence.lower() + if any(keyword in sentence.lower() for keyword in improvement_keywords): + improvements.append(sentence.strip() + '.') - if any(word in sentence_lower for word in ['critical', 'essential', 'must', 'urgent']): - return "high" - elif any(word in sentence_lower for word in ['should', 'important', 'recommend']): - return "medium" - else: - return "low" - - def _estimate_effort(self, sentence: str) -> str: - """Estimate effort required for improvement.""" - sentence_lower = sentence.lower() - - if any(word in sentence_lower for word in ['simple', 'easy', 'quick', 'minor']): - return "low" - elif any(word in sentence_lower for word in ['moderate', 'some', 'reasonable']): - return "medium" - elif any(word in sentence_lower for word in ['significant', 'substantial', 'major']): - return "high" - else: - return "medium" + return improvements[:5] def _calculate_quality_score(self, content: str) -> Dict[str, float]: - """Calculate detailed quality scores.""" - scores = self.evaluation_rubric.copy() - content_lower = content.lower() + """Calculate quality scores for different criteria.""" + scores = {} - # Score each criterion based on content analysis for criterion in self.evaluation_criteria: - score = 0.5 # Base score - - # Positive indicators - if criterion in content_lower and any(word in content_lower for word in ['good', 'strong', 'excellent']): - score += 0.3 - - # Negative indicators - if criterion in content_lower and any(word in content_lower for word in ['poor', 'weak', 'lacking']): - score -= 0.3 + # Simplified scoring based on keyword presence + positive_count = content.lower().count(criterion) + content.lower().count('good') + content.lower().count('strong') + negative_count = content.lower().count('poor') + content.lower().count('weak') + content.lower().count('lacking') - scores[criterion]["score"] = max(0.0, min(1.0, score)) - - # Calculate overall score - overall = sum(scores[c]["score"] * scores[c]["weight"] for c in scores) - - return { - "overall": overall, - "breakdown": {c: scores[c]["score"] for c in scores}, - "grade": self._convert_to_grade(overall) - } - - def _convert_to_grade(self, score: float) -> str: - """Convert numeric score to letter grade.""" - if score >= 0.9: - return "A" - elif score >= 0.8: - return "B" - elif score >= 0.7: - return "C" - elif score >= 0.6: - return "D" - else: - return "F" - - def _identify_alternatives(self, content: str) -> List[str]: - """Identify alternative perspectives mentioned.""" - alternatives = [] - alternative_indicators = ['alternatively', 'another perspective', 'different approach', - 'could also', 'different view', 'alternative'] + score = min(1.0, max(0.0, 0.5 + (positive_count * 0.1) - (negative_count * 0.15))) + scores[criterion] = score - sentences = content.split('.') - for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in alternative_indicators): - alternatives.append(sentence.strip() + '.') - - return alternatives[:4] - - def _generate_verdict(self, content: str) -> Dict[str, str]: - """Generate final verdict based on evaluation.""" - # Simple verdict generation based on content sentiment - positive_count = sum(content.lower().count(word) for word in ['good', 'strong', 'excellent', 'effective']) - negative_count = sum(content.lower().count(word) for word in ['poor', 'weak', 'lacking', 'insufficient']) - - if positive_count > negative_count * 2: - verdict = "Approved with minor revisions" - confidence = "high" - elif positive_count > negative_count: - verdict = "Approved with moderate revisions" - confidence = "medium" - else: - verdict = "Requires significant improvements" - confidence = "medium" + scores['overall'] = sum(scores.values()) / len(scores) - return { - "verdict": verdict, - "confidence": confidence, - "summary": "Based on comprehensive evaluation across multiple criteria." - } + return scores class SynthesizerAgent(BaseAgent): """Agent specialized in synthesizing information and creating coherent narratives.""" - def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): + def __init__(self, name: str, llm: ChatOpenAI): super().__init__(name, AgentRole.SYNTHESIZER, llm) - self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate", "harmonize"] - self.output_formats = ["executive_summary", "detailed_report", "action_plan", "strategic_recommendation"] + self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate"] async def _execute_task(self, task: Task) -> Any: """Execute synthesis task.""" - # Gather all relevant information from previous agents - synthesis_input = self._gather_synthesis_input(task) - prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are a Synthesizer Agent specializing in integrating diverse information. Your role is to: @@ -1023,10 +491,8 @@ class SynthesizerAgent(BaseAgent): 2. Resolve contradictions and find common ground 3. Create comprehensive summaries that capture key insights 4. Generate actionable conclusions and recommendations - 5. Ensure clarity and accessibility of complex information - 6. Prioritize information based on relevance and impact - 7. Create structured outputs suitable for decision-making"""), - HumanMessage(content=f"Synthesize the following information: {task.description}\n\nInput data: {synthesis_input}") + 5. Ensure clarity and accessibility of complex information"""), + HumanMessage(content=f"Synthesize the following information: {task.description}") ]) response = await self.llm.ainvoke(prompt.format_messages()) @@ -1038,452 +504,116 @@ class SynthesizerAgent(BaseAgent): "consensus_points": self._identify_consensus(response.content), "contradictions": self._identify_contradictions(response.content), "final_recommendations": self._generate_final_recommendations(response.content), - "executive_summary": self._create_executive_summary(response.content), - "action_items": self._extract_action_items(response.content), - "confidence_level": self._assess_synthesis_confidence(response.content) + "executive_summary": self._create_executive_summary(response.content) } - # Store synthesis for future reference - self.memory.update_context('latest_synthesis', synthesis_result) - return synthesis_result - def _gather_synthesis_input(self, task: Task) -> str: - """Gather all relevant information for synthesis.""" - input_sections = [] - - # Get findings from all agents - agent_findings = {} - for msg in self.memory.get_recent_messages(20): - if msg.sender not in agent_findings: - agent_findings[msg.sender] = [] - agent_findings[msg.sender].append(msg.content[:500]) - - # Structure input by agent type - for agent, findings in agent_findings.items(): - if findings: - input_sections.append(f"\n{agent} Contributions:\n" + "\n".join(findings[:3])) - - # Add knowledge base insights - knowledge_items = [] - for key, knowledge in self.memory.knowledge_base.items(): - knowledge_items.append(f"{knowledge['type'].title()}: {knowledge['content'][:200]}...") - - if knowledge_items: - input_sections.append("\nKnowledge Base:\n" + "\n".join(knowledge_items[:5])) - - return "\n".join(input_sections) - - def _extract_themes(self, content: str) -> List[Dict[str, Any]]: + def _extract_themes(self, content: str) -> List[Dict[str, str]]: """Extract major themes from synthesis.""" themes = [] - theme_indicators = ['theme', 'pattern', 'trend', 'common', 'recurring', - 'central', 'key finding', 'main point'] + theme_keywords = ['theme', 'pattern', 'trend', 'common', 'recurring', 'central'] - # Split into paragraphs and analyze paragraphs = content.split('\n\n') - theme_count = 0 - - for paragraph in paragraphs: - paragraph_lower = paragraph.lower() - if any(indicator in paragraph_lower for indicator in theme_indicators): - theme_count += 1 + for i, paragraph in enumerate(paragraphs[:5]): # Limit to first 5 paragraphs + if any(keyword in paragraph.lower() for keyword in theme_keywords): themes.append({ - "theme": f"Theme {theme_count}", - "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(), - "importance": self._assess_theme_importance(paragraph), - "support_level": self._assess_support_level(paragraph) + "theme": f"Theme {i+1}", + "description": paragraph.strip()[:200] + "..." if len(paragraph) > 200 else paragraph.strip() }) - # If no explicit themes found, extract from content structure - if not themes and paragraphs: - for i, paragraph in enumerate(paragraphs[:5]): - if len(paragraph.strip()) > 50: - themes.append({ - "theme": f"Finding {i+1}", - "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(), - "importance": "medium", - "support_level": "moderate" - }) - - return themes[:6] - - def _assess_theme_importance(self, content: str) -> str: - """Assess the importance of a theme.""" - content_lower = content.lower() - - high_importance_indicators = ['critical', 'essential', 'fundamental', 'crucial', 'vital'] - if any(indicator in content_lower for indicator in high_importance_indicators): - return "high" - - low_importance_indicators = ['minor', 'secondary', 'marginal', 'peripheral'] - if any(indicator in content_lower for indicator in low_importance_indicators): - return "low" - - return "medium" - - def _assess_support_level(self, content: str) -> str: - """Assess the level of support for a theme.""" - content_lower = content.lower() - - strong_support = ['consensus', 'unanimous', 'clear evidence', 'strongly supported'] - if any(indicator in content_lower for indicator in strong_support): - return "strong" - - weak_support = ['limited evidence', 'some indication', 'preliminary', 'tentative'] - if any(indicator in content_lower for indicator in weak_support): - return "weak" - - return "moderate" + return themes - def _identify_consensus(self, content: str) -> List[Dict[str, str]]: + def _identify_consensus(self, content: str) -> List[str]: """Identify points of consensus.""" - consensus_points = [] - consensus_indicators = ['agree', 'consensus', 'common', 'shared', 'unanimous', - 'consistent', 'alignment', 'convergence'] + consensus = [] + consensus_keywords = ['agree', 'consensus', 'common', 'shared', 'unanimous', 'consistent'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in consensus_indicators): - consensus_points.append({ - "point": sentence.strip() + '.', - "strength": "strong" if "unanimous" in sentence_lower or "clear consensus" in sentence_lower else "moderate" - }) + if any(keyword in sentence.lower() for keyword in consensus_keywords): + consensus.append(sentence.strip() + '.') - return consensus_points[:6] + return consensus[:5] - def _identify_contradictions(self, content: str) -> List[Dict[str, str]]: + def _identify_contradictions(self, content: str) -> List[str]: """Identify contradictions or conflicts.""" contradictions = [] - conflict_indicators = ['however', 'contrary', 'conflict', 'disagree', 'opposing', - 'contradicts', 'tension', 'divergent', 'inconsistent'] + conflict_keywords = ['however', 'contrary', 'conflict', 'disagree', 'opposing', 'contradicts'] sentences = content.split('.') for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in conflict_indicators): - contradictions.append({ - "contradiction": sentence.strip() + '.', - "resolution_suggested": self._check_for_resolution(sentence), - "impact": self._assess_contradiction_impact(sentence) - }) + if any(keyword in sentence.lower() for keyword in conflict_keywords): + contradictions.append(sentence.strip() + '.') - return contradictions[:4] - - def _check_for_resolution(self, sentence: str) -> bool: - """Check if a resolution is suggested for the contradiction.""" - resolution_indicators = ['can be resolved', 'reconcile', 'bridge', 'common ground', 'compromise'] - return any(indicator in sentence.lower() for indicator in resolution_indicators) - - def _assess_contradiction_impact(self, sentence: str) -> str: - """Assess the impact of a contradiction.""" - sentence_lower = sentence.lower() - - if any(word in sentence_lower for word in ['fundamental', 'major', 'significant']): - return "high" - elif any(word in sentence_lower for word in ['minor', 'small', 'slight']): - return "low" - else: - return "medium" + return contradictions[:3] - def _generate_final_recommendations(self, content: str) -> List[Dict[str, Any]]: + def _generate_final_recommendations(self, content: str) -> List[str]: """Generate final synthesized recommendations.""" recommendations = [] - # Extract recommendation sentences - rec_indicators = ['recommend', 'suggest', 'propose', 'advise', 'should', 'must'] - sentences = content.split('.') - - for sentence in sentences: - sentence_lower = sentence.lower() - if any(indicator in sentence_lower for indicator in rec_indicators): - recommendations.append({ - "recommendation": sentence.strip() + '.', - "priority": self._determine_priority(sentence), - "timeframe": self._determine_timeframe(sentence), - "category": self._categorize_recommendation(sentence) - }) - - # Sort by priority - priority_order = {"high": 0, "medium": 1, "low": 2} - recommendations.sort(key=lambda x: priority_order.get(x["priority"], 3)) + # Look for recommendation sections + lines = content.split('\n') + in_recommendations = False - return recommendations[:8] - - def _determine_priority(self, sentence: str) -> str: - """Determine recommendation priority.""" - sentence_lower = sentence.lower() + for line in lines: + if 'recommend' in line.lower() or 'conclusion' in line.lower(): + in_recommendations = True + elif in_recommendations and line.strip(): + recommendations.append(line.strip()) + if len(recommendations) >= 5: + break - if any(word in sentence_lower for word in ['urgent', 'immediate', 'critical', 'must']): - return "high" - elif any(word in sentence_lower for word in ['should', 'important', 'recommend']): - return "medium" - else: - return "low" - - def _determine_timeframe(self, sentence: str) -> str: - """Determine recommendation timeframe.""" - sentence_lower = sentence.lower() - - if any(word in sentence_lower for word in ['immediate', 'now', 'urgent', 'asap']): - return "immediate" - elif any(word in sentence_lower for word in ['short-term', 'soon', 'near']): - return "short-term" - elif any(word in sentence_lower for word in ['long-term', 'future', 'eventually']): - return "long-term" - else: - return "medium-term" - - def _categorize_recommendation(self, sentence: str) -> str: - """Categorize the type of recommendation.""" - sentence_lower = sentence.lower() - - if any(word in sentence_lower for word in ['strategy', 'strategic', 'plan']): - return "strategic" - elif any(word in sentence_lower for word in ['operational', 'process', 'procedure']): - return "operational" - elif any(word in sentence_lower for word in ['tactical', 'action', 'implement']): - return "tactical" - else: - return "general" + return recommendations def _create_executive_summary(self, content: str) -> str: """Create an executive summary of the synthesis.""" - # Extract key sentences for summary - summary_parts = [] - - # Get opening statement + # Take first paragraph or create summary paragraphs = content.split('\n\n') if paragraphs: - opening = paragraphs[0][:200] - if len(paragraphs[0]) > 200: - opening += "..." - summary_parts.append(opening) - - # Extract key findings - key_finding_indicators = ['key finding', 'main conclusion', 'importantly', 'notably'] - for paragraph in paragraphs[1:]: - if any(indicator in paragraph.lower() for indicator in key_finding_indicators): - summary_parts.append(paragraph[:150] + "..." if len(paragraph) > 150 else paragraph) - if len(summary_parts) >= 3: - break - - # Add conclusion if present - if len(paragraphs) > 1: - conclusion = paragraphs[-1][:150] - if conclusion not in summary_parts: - summary_parts.append(conclusion + "..." if len(paragraphs[-1]) > 150 else conclusion) - - return " ".join(summary_parts) - - def _extract_action_items(self, content: str) -> List[Dict[str, str]]: - """Extract specific action items from synthesis.""" - action_items = [] - action_indicators = ['action:', 'task:', 'todo:', 'action item', 'next step', 'to do'] - - lines = content.split('\n') - for line in lines: - line_lower = line.lower() - if any(indicator in line_lower for indicator in action_indicators): - action_items.append({ - "action": line.strip(), - "owner": "TBD", - "deadline": "TBD", - "status": "pending" - }) - # Also check for numbered action items - elif line.strip() and line.strip()[0].isdigit() and 'action' in line_lower: - action_items.append({ - "action": line.strip(), - "owner": "TBD", - "deadline": "TBD", - "status": "pending" - }) - - return action_items[:10] - - def _assess_synthesis_confidence(self, content: str) -> Dict[str, Any]: - """Assess confidence in the synthesis.""" - # Calculate various confidence indicators - word_count = len(content.split()) - - # Check for confidence language - high_confidence_words = ['clear', 'strong', 'definitive', 'conclusive', 'certain'] - low_confidence_words = ['uncertain', 'unclear', 'tentative', 'preliminary', 'limited'] - - high_conf_count = sum(content.lower().count(word) for word in high_confidence_words) - low_conf_count = sum(content.lower().count(word) for word in low_confidence_words) - - # Calculate confidence score - base_confidence = 0.7 - confidence_adjustment = (high_conf_count * 0.05) - (low_conf_count * 0.08) - overall_confidence = max(0.3, min(0.95, base_confidence + confidence_adjustment)) - - return { - "overall": overall_confidence, - "level": "high" if overall_confidence > 0.8 else "medium" if overall_confidence > 0.6 else "low", - "factors": { - "content_depth": word_count > 500, - "evidence_strength": high_conf_count > low_conf_count, - "consensus_level": "consensus" in content.lower() - } - } + summary = paragraphs[0][:300] + if len(paragraphs[0]) > 300: + summary += "..." + return summary + return "Summary generation in progress..." class CoordinatorAgent(BaseAgent): """Agent responsible for coordinating other agents and managing workflow.""" - def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): + def __init__(self, name: str, llm: ChatOpenAI): super().__init__(name, AgentRole.COORDINATOR, llm) self.agents: Dict[str, BaseAgent] = {} self.task_queue: List[Task] = [] self.completed_tasks: List[Task] = [] self.workflow_graph = nx.DiGraph() self.execution_history: List[Dict[str, Any]] = [] - self.workflow_templates = self._create_workflow_templates() - - def _create_workflow_templates(self) -> Dict[str, List[Dict[str, Any]]]: - """Create predefined workflow templates for common problem types.""" - return { - "research_analysis": [ - {"role": "Researcher", "task": "Gather comprehensive information"}, - {"role": "Analyst", "task": "Analyze findings and identify patterns"}, - {"role": "Critic", "task": "Evaluate analysis quality"}, - {"role": "Synthesizer", "task": "Create final recommendations"} - ], - "strategic_planning": [ - {"role": "Researcher", "task": "Research current state and trends"}, - {"role": "Analyst", "task": "SWOT analysis and opportunity identification"}, - {"role": "Researcher", "task": "Benchmark best practices"}, - {"role": "Critic", "task": "Risk assessment and gap analysis"}, - {"role": "Synthesizer", "task": "Strategic plan synthesis"} - ], - "problem_solving": [ - {"role": "Researcher", "task": "Define problem and gather context"}, - {"role": "Analyst", "task": "Root cause analysis"}, - {"role": "Researcher", "task": "Research potential solutions"}, - {"role": "Critic", "task": "Evaluate solution feasibility"}, - {"role": "Synthesizer", "task": "Recommend optimal solution"} - ] - } def register_agent(self, agent: BaseAgent): """Register an agent with the coordinator.""" self.agents[agent.name] = agent self.workflow_graph.add_node(agent.name, role=agent.role.value) - logger.info(f"Registered agent: {agent.name} with role {agent.role.value}") - async def decompose_problem(self, problem: str, use_template: bool = False) -> List[Task]: + async def decompose_problem(self, problem: str) -> List[Task]: """Decompose a complex problem into subtasks.""" - if use_template: - # Try to match problem to a template - template_tasks = self._match_problem_to_template(problem) - if template_tasks: - return template_tasks - - if self.llm: - # Use LLM to decompose problem - prompt = ChatPromptTemplate.from_messages([ - SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems. - Decompose the problem into specific subtasks that can be assigned to specialized agents: - - Researcher: For gathering information and facts - - Analyst: For analyzing data and identifying patterns - - Critic: For evaluating quality and identifying issues - - Synthesizer: For combining insights and creating summaries - - Create 4-8 clear, actionable subtasks with dependencies. - Format each task as: [Role]: [Specific task description]"""), - HumanMessage(content=f"Decompose this problem into subtasks: {problem}") - ]) - - response = await self.llm.ainvoke(prompt.format_messages()) - tasks = self._parse_tasks(response.content, problem) - else: - # Demo mode - use template or default decomposition - tasks = self._create_default_tasks(problem) - - # Enhance tasks with metadata - for i, task in enumerate(tasks): - task.metadata['problem_complexity'] = self._assess_problem_complexity(problem) - task.metadata['estimated_duration'] = self._estimate_task_duration(task) + prompt = ChatPromptTemplate.from_messages([ + SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems. + Decompose the problem into specific subtasks that can be assigned to specialized agents: + - Researcher: For gathering information and facts + - Analyst: For analyzing data and identifying patterns + - Critic: For evaluating quality and identifying issues + - Synthesizer: For combining insights and creating summaries - return tasks - - def _match_problem_to_template(self, problem: str) -> Optional[List[Task]]: - """Match problem to a workflow template.""" - problem_lower = problem.lower() - - # Check for template matches - if any(word in problem_lower for word in ['strategy', 'strategic', 'plan']): - template_name = "strategic_planning" - elif any(word in problem_lower for word in ['research', 'analyze', 'investigate']): - template_name = "research_analysis" - elif any(word in problem_lower for word in ['problem', 'solve', 'solution', 'fix']): - template_name = "problem_solving" - else: - return None + Create clear, actionable subtasks with dependencies."""), + HumanMessage(content=f"Decompose this problem into subtasks: {problem}") + ]) - # Create tasks from template - template = self.workflow_templates[template_name] - tasks = [] + response = await self.llm.ainvoke(prompt.format_messages()) - for i, step in enumerate(template): - task = Task( - id=f"task_{i+1}", - description=f"{step['task']} for: {problem}", - metadata={ - "original_problem": problem, - "suggested_role": step['role'], - "template": template_name - } - ) - tasks.append(task) + # Parse response into tasks + tasks = self._parse_tasks(response.content, problem) return tasks - def _assess_problem_complexity(self, problem: str) -> str: - """Assess the complexity of a problem.""" - # Simple heuristic based on problem characteristics - complexity_indicators = { - "high": ['multiple', 'complex', 'comprehensive', 'strategic', 'long-term'], - "medium": ['analyze', 'evaluate', 'develop', 'assess'], - "low": ['simple', 'basic', 'straightforward', 'identify'] - } - - problem_lower = problem.lower() - - for level, indicators in complexity_indicators.items(): - if any(indicator in problem_lower for indicator in indicators): - return level - - # Default based on length - return "high" if len(problem) > 200 else "medium" - - def _estimate_task_duration(self, task: Task) -> float: - """Estimate task duration in seconds.""" - # Base estimation on task characteristics - base_duration = 30.0 - - # Adjust based on role - role_multipliers = { - "Researcher": 1.2, - "Analyst": 1.5, - "Critic": 1.0, - "Synthesizer": 1.3 - } - - role = task.metadata.get("suggested_role", "Researcher") - duration = base_duration * role_multipliers.get(role, 1.0) - - # Adjust based on complexity - complexity = task.metadata.get("problem_complexity", "medium") - if complexity == "high": - duration *= 1.5 - elif complexity == "low": - duration *= 0.7 - - return duration - def _parse_tasks(self, content: str, original_problem: str) -> List[Task]: """Parse LLM response into Task objects.""" tasks = [] @@ -1496,116 +626,60 @@ class CoordinatorAgent(BaseAgent): line = line.strip() if not line: continue - - # Check for role indicators - role_found = False + + # Check if line indicates a role for role in AgentRole: - if role.value in line or role.value.lower() in line.lower(): + if role.value in line: current_role = role.value - role_found = True break - # Extract task if we have a role - if current_role and ':' in line: - # Extract task description after role mention - task_parts = line.split(':', 1) - if len(task_parts) > 1: - task_desc = task_parts[1].strip() - - task = Task( - id=f"task_{task_id}", - description=task_desc, - metadata={ - "original_problem": original_problem, - "suggested_role": current_role, - "source": "llm_decomposition" - } - ) - - tasks.append(task) - task_id += 1 + # If line starts with number or dash, it's likely a task + if (line[0].isdigit() or line.startswith('-')) and current_role: + # Extract task description + task_desc = line.lstrip('0123456789.-').strip() + + task = Task( + id=f"task_{task_id}", + description=task_desc, + metadata={ + "original_problem": original_problem, + "suggested_role": current_role + } + ) + + tasks.append(task) + task_id += 1 - # Ensure we have at least some tasks - if len(tasks) < 4: - tasks.extend(self._create_default_tasks(original_problem)[len(tasks):]) + # If no tasks were parsed, create default tasks + if not tasks: + tasks = [ + Task(id="task_1", description=f"Research background information on: {original_problem}", + metadata={"suggested_role": "Researcher"}), + Task(id="task_2", description=f"Analyze key aspects of: {original_problem}", + metadata={"suggested_role": "Analyst"}), + Task(id="task_3", description="Critically evaluate the research and analysis", + metadata={"suggested_role": "Critic"}), + Task(id="task_4", description="Synthesize all findings into actionable insights", + metadata={"suggested_role": "Synthesizer"}) + ] return tasks - def _create_default_tasks(self, problem: str) -> List[Task]: - """Create default tasks for a problem.""" - return [ - Task( - id="task_1", - description=f"Research comprehensive background information on: {problem}", - metadata={"suggested_role": "Researcher", "source": "default"} - ), - Task( - id="task_2", - description=f"Analyze key factors and patterns related to: {problem}", - metadata={"suggested_role": "Analyst", "source": "default"} - ), - Task( - id="task_3", - description="Critically evaluate the research findings and analysis quality", - metadata={"suggested_role": "Critic", "source": "default"} - ), - Task( - id="task_4", - description="Synthesize all findings into actionable insights and recommendations", - metadata={"suggested_role": "Synthesizer", "source": "default"} - ) - ] - - def _build_dependency_graph(self, tasks: List[Task]): - """Build a dependency graph for tasks.""" - # Define role execution order - role_order = { - "Researcher": 1, - "Analyst": 2, - "Critic": 3, - "Synthesizer": 4 - } - - # Sort tasks by role order - sorted_tasks = sorted(tasks, - key=lambda t: role_order.get(t.metadata.get("suggested_role", "Researcher"), 5)) - - # Create dependencies based on order - for i in range(len(sorted_tasks) - 1): - current_task = sorted_tasks[i] - next_task = sorted_tasks[i + 1] - - # Only add dependency if next task has higher order role - current_order = role_order.get(current_task.metadata.get("suggested_role"), 0) - next_order = role_order.get(next_task.metadata.get("suggested_role"), 0) - - if next_order >= current_order: - next_task.dependencies.append(current_task.id) - async def execute_workflow(self, tasks: List[Task], parallel: bool = True) -> Dict[str, Any]: """Execute the workflow with given tasks.""" start_time = datetime.now() - self.performance_tracker.start_tracking() # Build task dependency graph self._build_dependency_graph(tasks) - # Update workflow graph - self._update_workflow_graph(tasks) - # Execute tasks - try: - if parallel: - await self._execute_parallel(tasks) - else: - await self._execute_sequential(tasks) - except Exception as e: - logger.error(f"Workflow execution error: {str(e)}") + if parallel: + results = await self._execute_parallel(tasks) + else: + results = await self._execute_sequential(tasks) # Compile final results end_time = datetime.now() - self.performance_tracker.end_tracking() - execution_time = (end_time - start_time).total_seconds() workflow_result = { @@ -1614,7 +688,6 @@ class CoordinatorAgent(BaseAgent): "success_rate": self._calculate_success_rate(tasks), "agent_contributions": self._compile_agent_contributions(tasks), "workflow_graph": self.workflow_graph, - "performance_metrics": self.performance_tracker.get_performance_summary(), "timestamp": datetime.now() } @@ -1622,73 +695,68 @@ class CoordinatorAgent(BaseAgent): return workflow_result - def _update_workflow_graph(self, tasks: List[Task]): - """Update the workflow graph with task relationships.""" - # Add task nodes - for task in tasks: - self.workflow_graph.add_node( - task.id, - task_description=task.description[:50] + "...", - status=task.status.value - ) + def _build_dependency_graph(self, tasks: List[Task]): + """Build a dependency graph for tasks.""" + # For simplicity, create a linear dependency chain based on suggested roles + role_order = ["Researcher", "Analyst", "Critic", "Synthesizer"] - # Add edges for dependencies + # Group tasks by role + tasks_by_role = {} for task in tasks: - for dep_id in task.dependencies: - self.workflow_graph.add_edge(dep_id, task.id) + role = task.metadata.get("suggested_role", "Researcher") + if role not in tasks_by_role: + tasks_by_role[role] = [] + tasks_by_role[role].append(task) + + # Create dependencies + for i in range(len(role_order) - 1): + current_role = role_order[i] + next_role = role_order[i + 1] + + if current_role in tasks_by_role and next_role in tasks_by_role: + for current_task in tasks_by_role[current_role]: + for next_task in tasks_by_role[next_role]: + next_task.dependencies.append(current_task.id) async def _execute_parallel(self, tasks: List[Task]) -> List[Task]: """Execute tasks in parallel where possible.""" completed = set() pending = tasks.copy() - with ThreadPoolExecutor(max_workers=len(self.agents)) as executor: - while pending: - # Find tasks ready for execution - ready_tasks = [ - task for task in pending - if all(dep in completed for dep in task.dependencies) - ] - - if not ready_tasks: - # Handle potential deadlock - logger.warning("No ready tasks found, executing first pending task") - ready_tasks = [pending[0]] if pending else [] - - if not ready_tasks: - break - - # Submit tasks for parallel execution - future_to_task = {} - for task in ready_tasks: - agent_name = self._select_agent_for_task(task) - if agent_name and agent_name in self.agents: - agent = self.agents[agent_name] - future = executor.submit(asyncio.run, agent.process_task(task)) - future_to_task[future] = (task, agent_name) + while pending: + # Find tasks that can be executed (no pending dependencies) + ready_tasks = [ + task for task in pending + if all(dep in completed for dep in task.dependencies) + ] + + if not ready_tasks: + # Deadlock prevention - execute first pending task + ready_tasks = [pending[0]] + + # Execute ready tasks in parallel + task_futures = [] + for task in ready_tasks: + agent_name = self._select_agent_for_task(task) + if agent_name and agent_name in self.agents: + agent = self.agents[agent_name] + task_futures.append(agent.process_task(task)) + + # Update workflow graph + self.workflow_graph.add_edge( + self.name, agent_name, + task_id=task.id, + timestamp=datetime.now() + ) + + # Wait for tasks to complete + if task_futures: + completed_tasks = await asyncio.gather(*task_futures) - # Wait for tasks to complete - for future in as_completed(future_to_task): - task, agent_name = future_to_task[future] - try: - completed_task = future.result() - completed.add(task.id) - pending.remove(task) - self.completed_tasks.append(completed_task) - - # Update workflow graph - self.workflow_graph.add_edge( - self.name, agent_name, - task_id=task.id, - timestamp=datetime.now().isoformat() - ) - - # Record collaboration - await self._facilitate_collaboration(completed_task, agent_name) - - except Exception as e: - logger.error(f"Task {task.id} failed: {str(e)}") - task.status = TaskStatus.FAILED + for task in completed_tasks: + completed.add(task.id) + pending.remove(task) + self.completed_tasks.append(task) return tasks @@ -1698,19 +766,16 @@ class CoordinatorAgent(BaseAgent): agent_name = self._select_agent_for_task(task) if agent_name and agent_name in self.agents: agent = self.agents[agent_name] - completed_task = await agent.process_task(task) + await agent.process_task(task) # Update workflow graph self.workflow_graph.add_edge( self.name, agent_name, task_id=task.id, - timestamp=datetime.now().isoformat() + timestamp=datetime.now() ) - self.completed_tasks.append(completed_task) - - # Facilitate collaboration - await self._facilitate_collaboration(completed_task, agent_name) + self.completed_tasks.append(task) return tasks @@ -1721,59 +786,10 @@ class CoordinatorAgent(BaseAgent): # Find agent with matching role for agent_name, agent in self.agents.items(): if agent.role.value == suggested_role: - # Check agent availability - if agent.active and agent.current_task is None: - return agent_name - - # Fallback: find any available agent with the role - for agent_name, agent in self.agents.items(): - if agent.role.value == suggested_role: - return agent_name - - # Last resort: return any available agent - for agent_name, agent in self.agents.items(): - if agent.active: return agent_name - return None - - async def _facilitate_collaboration(self, task: Task, agent_name: str): - """Facilitate collaboration between agents after task completion.""" - if not task.result or task.status != TaskStatus.COMPLETED: - return - - # Create collaboration message - collab_message = AgentMessage( - sender=agent_name, - recipient="all", - content=f"Task completed: {task.description}\nKey findings: {str(task.result)[:500]}", - message_type=MessageType.COMPLETION_REPORT, - priority=3 - ) - - # Share with relevant agents - for other_agent_name, other_agent in self.agents.items(): - if other_agent_name != agent_name: - # Determine if collaboration is needed - if self._should_collaborate(task, other_agent): - await other_agent.memory.add_message(collab_message) - self.performance_tracker.record_collaboration() - - def _should_collaborate(self, task: Task, agent: BaseAgent) -> bool: - """Determine if an agent should receive collaboration message.""" - # Synthesizer should receive all completion reports - if agent.role == AgentRole.SYNTHESIZER: - return True - - # Critic should receive analysis and research results - if agent.role == AgentRole.CRITIC and task.metadata.get("suggested_role") in ["Researcher", "Analyst"]: - return True - - # Analyst should receive research results - if agent.role == AgentRole.ANALYST and task.metadata.get("suggested_role") == "Researcher": - return True - - return False + # Fallback to first available agent + return list(self.agents.keys())[0] if self.agents else None def _calculate_success_rate(self, tasks: List[Task]) -> float: """Calculate the success rate of task execution.""" @@ -1790,80 +806,18 @@ class CoordinatorAgent(BaseAgent): for agent_name, agent in self.agents.items(): agent_tasks = [task for task in tasks if task.assigned_to == agent_name] - if agent_tasks: - total_execution_time = sum( - task.performance_metrics.get('execution_time', 0) + contributions[agent_name] = { + "role": agent.role.value, + "tasks_completed": len(agent_tasks), + "average_confidence": np.mean([task.confidence for task in agent_tasks]) if agent_tasks else 0.0, + "total_execution_time": sum( + (task.completed_at - task.created_at).total_seconds() for task in agent_tasks + if task.completed_at ) - - avg_confidence = np.mean([task.confidence for task in agent_tasks]) - - contributions[agent_name] = { - "role": agent.role.value, - "tasks_completed": len(agent_tasks), - "average_confidence": avg_confidence, - "total_execution_time": total_execution_time, - "collaboration_count": len(agent.collaboration_partners), - "status": agent.get_status_summary() - } - else: - contributions[agent_name] = { - "role": agent.role.value, - "tasks_completed": 0, - "average_confidence": 0.0, - "total_execution_time": 0.0, - "collaboration_count": 0, - "status": agent.get_status_summary() - } + } return contributions - - def get_workflow_insights(self) -> Dict[str, Any]: - """Get insights about workflow execution patterns.""" - if not self.execution_history: - return {"message": "No execution history available"} - - # Analyze execution patterns - total_executions = len(self.execution_history) - avg_execution_time = np.mean([wf['execution_time'] for wf in self.execution_history]) - avg_success_rate = np.mean([wf['success_rate'] for wf in self.execution_history]) - - # Analyze agent performance - agent_stats = {} - for workflow in self.execution_history: - for agent, contrib in workflow['agent_contributions'].items(): - if agent not in agent_stats: - agent_stats[agent] = { - 'total_tasks': 0, - 'total_time': 0, - 'confidence_scores': [] - } - - agent_stats[agent]['total_tasks'] += contrib['tasks_completed'] - agent_stats[agent]['total_time'] += contrib['total_execution_time'] - if contrib['average_confidence'] > 0: - agent_stats[agent]['confidence_scores'].append(contrib['average_confidence']) - - # Calculate agent efficiency - agent_efficiency = {} - for agent, stats in agent_stats.items(): - if stats['total_tasks'] > 0: - agent_efficiency[agent] = { - 'avg_time_per_task': stats['total_time'] / stats['total_tasks'], - 'avg_confidence': np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0, - 'total_tasks': stats['total_tasks'] - } - - return { - 'total_workflows_executed': total_executions, - 'average_execution_time': avg_execution_time, - 'average_success_rate': avg_success_rate, - 'agent_efficiency': agent_efficiency, - 'most_efficient_agent': min(agent_efficiency.items(), - key=lambda x: x[1]['avg_time_per_task'])[0] if agent_efficiency else None, - 'highest_quality_agent': max(agent_efficiency.items(), - key=lambda x: x[1]['avg_confidence'])[0] if agent_efficiency else None - } class WorkflowVisualizer: """Handles visualization of agent interactions and workflow.""" @@ -1871,87 +825,46 @@ class WorkflowVisualizer: def __init__(self): self.color_map = Config.NODE_COLORS self.layout_cache = {} - self.animation_frames = [] def create_workflow_graph(self, workflow_graph: nx.DiGraph, - active_agents: List[str] = None, - highlight_tasks: List[str] = None) -> go.Figure: + active_agents: List[str] = None) -> go.Figure: """Create an interactive workflow visualization.""" - if len(workflow_graph.nodes()) == 0: - return self._create_empty_graph() - # Use hierarchical layout pos = self._hierarchical_layout(workflow_graph) - # Create traces + # Create edge trace edge_trace = self._create_edge_trace(workflow_graph, pos) - node_trace = self._create_node_trace(workflow_graph, pos, active_agents, highlight_tasks) - - # Add agent status indicators - status_trace = self._create_status_indicators(workflow_graph, pos, active_agents) - - # Create figure with all traces - traces = [edge_trace, node_trace] - if status_trace: - traces.append(status_trace) - - fig = go.Figure( - data=traces, - layout=go.Layout( - title={ - 'text': 'Agent Collaboration Network', - 'x': 0.5, - 'xanchor': 'center', - 'font': {'size': 20, 'color': '#2c3e50'} - }, - titlefont_size=16, - showlegend=False, - hovermode='closest', - margin=dict(b=40, l=40, r=40, t=60), - xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), - yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), - plot_bgcolor='#f8f9fa', - paper_bgcolor='white', - font=dict(family='Inter, sans-serif'), - annotations=self._create_annotations(workflow_graph, pos) - ) - ) - # Add shapes for visual enhancement - fig.update_layout( - shapes=self._create_background_shapes(pos) - ) + # Create node trace + node_trace = self._create_node_trace(workflow_graph, pos, active_agents) + + # Create figure + fig = go.Figure(data=[edge_trace, node_trace], + layout=go.Layout( + title='Agent Collaboration Network', + titlefont_size=16, + showlegend=False, + hovermode='closest', + margin=dict(b=20, l=5, r=5, t=40), + xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), + yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), + plot_bgcolor='white' + )) return fig - def _create_empty_graph(self) -> go.Figure: - """Create empty graph with message.""" - fig = go.Figure() - fig.add_annotation( - text="No workflow data to display.
Start an analysis to see agent interactions.", - xref="paper", yref="paper", - x=0.5, y=0.5, - showarrow=False, - font=dict(size=16, color='#7f8c8d'), - bgcolor='#ecf0f1', - borderpad=20 - ) - fig.update_layout( - height=500, - plot_bgcolor='#f8f9fa', - paper_bgcolor='white' - ) - return fig - def _hierarchical_layout(self, G: nx.DiGraph) -> Dict: """Create hierarchical layout for the graph.""" - # Check cache - graph_hash = hash(tuple(sorted(G.edges()))) + if len(G) == 0: + return {} + + # Try to use cached layout if graph structure hasn't changed + graph_hash = hash(tuple(G.edges())) if graph_hash in self.layout_cache: return self.layout_cache[graph_hash] - # Define role hierarchy with better spacing + # Define role hierarchy hierarchy = { 'Coordinator': 0, 'Researcher': 1, @@ -1960,35 +873,26 @@ class WorkflowVisualizer: 'Synthesizer': 4 } - # Group nodes by type - agent_nodes = [n for n in G.nodes() if G.nodes[n].get('role')] - task_nodes = [n for n in G.nodes() if 'task_' in str(n)] - + # Position nodes based on hierarchy pos = {} + role_counts = {} - # Position agent nodes - role_x_positions = {} - for i, (role, level) in enumerate(hierarchy.items()): - role_x_positions[role] = i * 3 - 6 # Spread horizontally - - for node in agent_nodes: + for node in G.nodes(): role = G.nodes[node].get('role', 'Unknown') level = hierarchy.get(role, 5) - x = role_x_positions.get(role, 0) - y = -level * 2.5 + + if level not in role_counts: + role_counts[level] = 0 + + x = role_counts[level] * 2 - (len([n for n in G.nodes() if hierarchy.get(G.nodes[n].get('role'), 5) == level]) - 1) + y = -level * 2 + pos[node] = (x, y) + role_counts[level] += 1 - # Position task nodes in a grid on the right - if task_nodes: - task_start_x = max(role_x_positions.values()) + 4 - cols = 3 - for i, task in enumerate(task_nodes): - row = i // cols - col = i % cols - pos[task] = (task_start_x + col * 2, -row * 1.5) - - # Cache layout + # Cache the layout self.layout_cache[graph_hash] = pos + return pos def _create_edge_trace(self, G: nx.DiGraph, pos: Dict) -> go.Scatter: @@ -1997,214 +901,82 @@ class WorkflowVisualizer: edge_y = [] for edge in G.edges(): - if edge[0] in pos and edge[1] in pos: - x0, y0 = pos[edge[0]] - x1, y1 = pos[edge[1]] - - # Add bezier curve points for smoother edges - mid_x = (x0 + x1) / 2 - mid_y = (y0 + y1) / 2 - 0.5 # Slight curve - - # Create curved edge - t = np.linspace(0, 1, 10) - for ti in t: - x = (1-ti)**2 * x0 + 2*(1-ti)*ti * mid_x + ti**2 * x1 - y = (1-ti)**2 * y0 + 2*(1-ti)*ti * mid_y + ti**2 * y1 - edge_x.append(x) - edge_y.append(y) - edge_x.append(None) - edge_y.append(None) + x0, y0 = pos.get(edge[0], (0, 0)) + x1, y1 = pos.get(edge[1], (0, 0)) + edge_x.extend([x0, x1, None]) + edge_y.extend([y0, y1, None]) edge_trace = go.Scatter( x=edge_x, y=edge_y, - line=dict(width=2, color='#bdc3c7'), + line=dict(width=2, color='#888'), hoverinfo='none', - mode='lines', - opacity=0.6 + mode='lines' ) return edge_trace def _create_node_trace(self, G: nx.DiGraph, pos: Dict, - active_agents: List[str] = None, - highlight_tasks: List[str] = None) -> go.Scatter: + active_agents: List[str] = None) -> go.Scatter: """Create node trace for the graph.""" node_x = [] node_y = [] node_colors = [] node_sizes = [] node_text = [] - node_hover = [] - node_symbols = [] for node in G.nodes(): - if node in pos: - x, y = pos[node] - node_x.append(x) - node_y.append(y) - - # Get node attributes - node_data = G.nodes[node] - role = node_data.get('role', '') - - # Determine node properties - if role: # Agent node - color = self.color_map.get(role, '#95a5a6') - size = 35 - symbol = 'circle' - - # Highlight active agents - if active_agents and node in active_agents: - size = 45 - color = self._brighten_color(color) - - hover_text = f"{node}
Role: {role}
Status: Active" - display_text = node.split('-')[0] # Show role name - - else: # Task node - status = node_data.get('status', 'pending') - color = self._get_status_color(status) - size = 25 - symbol = 'square' - - # Highlight specific tasks - if highlight_tasks and str(node) in highlight_tasks: - size = 30 - color = self._brighten_color(color) - - task_desc = node_data.get('task_description', 'Task') - hover_text = f"{node}
{task_desc}
Status: {status}" - display_text = node if len(str(node)) < 10 else f"T{node[-1]}" - - node_colors.append(color) - node_sizes.append(size) - node_text.append(display_text) - node_hover.append(hover_text) - node_symbols.append(symbol) + x, y = pos.get(node, (0, 0)) + node_x.append(x) + node_y.append(y) + + # Get node attributes + role = G.nodes[node].get('role', 'Unknown') + color = self.color_map.get(role, '#666') + + # Highlight active agents + if active_agents and node in active_agents: + size = 30 + color = self._brighten_color(color) + else: + size = 20 + + node_colors.append(color) + node_sizes.append(size) + + # Create hover text + degree = G.degree(node) + hover_text = f"{node}
Role: {role}
Connections: {degree}" + node_text.append(hover_text) node_trace = go.Scatter( x=node_x, y=node_y, mode='markers+text', hoverinfo='text', - text=node_text, - hovertext=node_hover, - textposition="bottom center", - textfont=dict(size=10, color='#2c3e50'), + text=[node for node in G.nodes()], + textposition="top center", + hovertext=node_text, marker=dict( showscale=False, color=node_colors, size=node_sizes, - symbol=node_symbols, line=dict(color='white', width=2) ) ) return node_trace - def _create_status_indicators(self, G: nx.DiGraph, pos: Dict, - active_agents: List[str] = None) -> Optional[go.Scatter]: - """Create status indicators for active agents.""" - if not active_agents: - return None - - indicator_x = [] - indicator_y = [] - - for agent in active_agents: - if agent in pos: - x, y = pos[agent] - # Place indicator above agent - indicator_x.append(x) - indicator_y.append(y + 0.5) - - if not indicator_x: - return None - - return go.Scatter( - x=indicator_x, - y=indicator_y, - mode='markers', - marker=dict( - size=8, - color='#2ecc71', - symbol='circle', - line=dict(width=2, color='white') - ), - hoverinfo='skip' - ) - - def _create_annotations(self, G: nx.DiGraph, pos: Dict) -> List[Dict]: - """Create annotations for the graph.""" - annotations = [] - - # Add role labels - role_positions = {} - for node in G.nodes(): - if G.nodes[node].get('role'): - role = G.nodes[node]['role'] - if role not in role_positions: - x, y = pos[node] - role_positions[role] = (x, y - 0.8) - - for role, (x, y) in role_positions.items(): - annotations.append(dict( - x=x, y=y, - text=f"{role}", - showarrow=False, - font=dict(size=12, color='#34495e'), - opacity=0.8 - )) - - return annotations - - def _create_background_shapes(self, pos: Dict) -> List[Dict]: - """Create background shapes for visual grouping.""" - shapes = [] - - # Group nodes by x-coordinate regions - if pos: - x_coords = [p[0] for p in pos.values()] - y_coords = [p[1] for p in pos.values()] - - if x_coords and y_coords: - # Create subtle background regions - padding = 1.5 - shapes.append(dict( - type='rect', - x0=min(x_coords) - padding, - y0=min(y_coords) - padding, - x1=max(x_coords) + padding, - y1=max(y_coords) + padding, - fillcolor='#ecf0f1', - opacity=0.3, - line=dict(width=0) - )) - - return shapes - - def _get_status_color(self, status: str) -> str: - """Get color based on task status.""" - status_colors = { - 'completed': '#2ecc71', - 'in_progress': '#f39c12', - 'pending': '#95a5a6', - 'failed': '#e74c3c' - } - return status_colors.get(status, '#95a5a6') - def _brighten_color(self, color: str) -> str: """Make a color brighter for highlighting.""" + # Simple brightening by mixing with white if color.startswith('#'): - # Convert hex to RGB r = int(color[1:3], 16) g = int(color[3:5], 16) b = int(color[5:7], 16) - # Brighten by mixing with white - factor = 0.3 - r = int(r + (255 - r) * factor) - g = int(g + (255 - g) * factor) - b = int(b + (255 - b) * factor) + # Mix with white (255, 255, 255) + r = int(r + (255 - r) * 0.3) + g = int(g + (255 - g) * 0.3) + b = int(b + (255 - b) * 0.3) return f"#{r:02x}{g:02x}{b:02x}" @@ -2213,113 +985,64 @@ class WorkflowVisualizer: def create_task_timeline(self, tasks: List[Task]) -> go.Figure: """Create a timeline visualization of task execution.""" - # Prepare timeline data + # Prepare data for timeline timeline_data = [] for task in tasks: - if task.created_at: - # Use completed_at if available, otherwise estimate - end_time = task.completed_at if task.completed_at else task.created_at + timedelta(seconds=30) - + if task.created_at and task.completed_at: timeline_data.append({ 'Task': task.id, 'Agent': task.assigned_to or 'Unassigned', 'Start': task.created_at, - 'Finish': end_time, + 'Finish': task.completed_at, 'Status': task.status.value, 'Confidence': task.confidence }) if not timeline_data: - return self._create_empty_timeline() + # Return empty figure if no completed tasks + fig = go.Figure() + fig.add_annotation( + text="No completed tasks to display", + xref="paper", yref="paper", + x=0.5, y=0.5, + showarrow=False + ) + return fig - # Create DataFrame df = pd.DataFrame(timeline_data) - # Create Gantt chart with custom styling + # Create Gantt chart fig = px.timeline( df, x_start="Start", x_end="Finish", y="Agent", color="Confidence", - hover_data=["Task", "Status"], - color_continuous_scale="Viridis", - labels={'Confidence': 'Confidence Score'} + hover_data=["Task", "Status", "Confidence"], + title="Task Execution Timeline", + color_continuous_scale="Viridis" ) - # Update layout for professional appearance - fig.update_layout( - title={ - 'text': 'Task Execution Timeline', - 'x': 0.5, - 'xanchor': 'center', - 'font': {'size': 18, 'color': '#2c3e50'} - }, - height=400, - xaxis_title="Time", - yaxis_title="Agent", - plot_bgcolor='#f8f9fa', - paper_bgcolor='white', - font=dict(family='Inter, sans-serif'), - yaxis={'categoryorder': 'total ascending'} - ) + fig.update_yaxis(categoryorder="total ascending") + fig.update_layout(height=400) - # Add grid lines - fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0') - fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0') - - return fig - - def _create_empty_timeline(self) -> go.Figure: - """Create empty timeline with message.""" - fig = go.Figure() - fig.add_annotation( - text="No task execution data available yet.", - xref="paper", yref="paper", - x=0.5, y=0.5, - showarrow=False, - font=dict(size=14, color='#7f8c8d') - ) - fig.update_layout( - height=400, - plot_bgcolor='#f8f9fa', - paper_bgcolor='white' - ) return fig def create_confidence_heatmap(self, agent_contributions: Dict[str, Any]) -> go.Figure: - """Create a heatmap showing agent performance metrics.""" - - if not agent_contributions: - return self._create_empty_heatmap() + """Create a heatmap showing agent confidence levels.""" - # Prepare data agents = list(agent_contributions.keys()) - metrics = ['Tasks Completed', 'Avg Confidence', 'Time Efficiency', 'Collaboration Score'] + metrics = ['tasks_completed', 'average_confidence', 'total_execution_time'] - # Create data matrix + # Normalize data for heatmap data = [] for metric in metrics: row = [] for agent in agents: - contrib = agent_contributions[agent] - - if metric == 'Tasks Completed': - value = contrib.get('tasks_completed', 0) / 10.0 # Normalize - elif metric == 'Avg Confidence': - value = contrib.get('average_confidence', 0) - elif metric == 'Time Efficiency': - # Inverse of average time per task, normalized - time = contrib.get('total_execution_time', 1) - tasks = contrib.get('tasks_completed', 1) - avg_time = time / tasks if tasks > 0 else float('inf') - value = min(1.0, 30.0 / avg_time) if avg_time > 0 else 0 - elif metric == 'Collaboration Score': - value = min(1.0, contrib.get('collaboration_count', 0) / 5.0) - else: - value = 0 - + value = agent_contributions[agent].get(metric, 0) + if metric == 'total_execution_time': + value = value / 60 # Convert to minutes row.append(value) data.append(row) @@ -2327,103 +1050,18 @@ class WorkflowVisualizer: fig = go.Figure(data=go.Heatmap( z=data, x=agents, - y=metrics, + y=['Tasks Completed', 'Avg Confidence', 'Time (min)'], colorscale='Blues', text=np.round(data, 2), texttemplate='%{text}', - textfont={"size": 12}, - colorbar=dict(title="Score", titleside="right"), - hoverongaps=False + textfont={"size": 10} )) - # Update layout fig.update_layout( - title={ - 'text': 'Agent Performance Metrics', - 'x': 0.5, - 'xanchor': 'center', - 'font': {'size': 18, 'color': '#2c3e50'} - }, + title="Agent Performance Metrics", xaxis_title="Agents", yaxis_title="Metrics", - height=350, - plot_bgcolor='white', - paper_bgcolor='white', - font=dict(family='Inter, sans-serif') - ) - - # Add cell borders - fig.update_xaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50') - fig.update_yaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50') - - return fig - - def _create_empty_heatmap(self) -> go.Figure: - """Create empty heatmap with message.""" - fig = go.Figure() - fig.add_annotation( - text="No agent performance data available yet.", - xref="paper", yref="paper", - x=0.5, y=0.5, - showarrow=False, - font=dict(size=14, color='#7f8c8d') - ) - fig.update_layout( - height=350, - plot_bgcolor='#f8f9fa', - paper_bgcolor='white' - ) - return fig - - def create_performance_comparison(self, performance_metrics: Dict[str, Any]) -> go.Figure: - """Create performance comparison visualization.""" - - # Extract metrics - baseline_time = Config.BENCHMARK_BASELINE['single_agent_time'] - actual_time = performance_metrics.get('average_task_completion_time', baseline_time) - time_improvement = performance_metrics.get('time_improvement_percentage', 0) - - # Create bar chart comparing performance - categories = ['Single Agent', 'Multi-Agent System'] - values = [baseline_time, actual_time] - colors = ['#e74c3c', '#2ecc71'] - - fig = go.Figure(data=[ - go.Bar( - x=categories, - y=values, - marker_color=colors, - text=[f'{v:.1f}s' for v in values], - textposition='auto' - ) - ]) - - # Add improvement annotation - fig.add_annotation( - x=1, y=actual_time + 5, - text=f"{time_improvement:.1f}% Faster", - showarrow=True, - arrowhead=2, - arrowsize=1, - arrowwidth=2, - arrowcolor='#2ecc71', - font=dict(size=14, color='#2ecc71', weight='bold') - ) - - # Update layout - fig.update_layout( - title={ - 'text': 'Performance Comparison', - 'x': 0.5, - 'xanchor': 'center', - 'font': {'size': 18, 'color': '#2c3e50'} - }, - yaxis_title="Average Completion Time (seconds)", - height=400, - plot_bgcolor='#f8f9fa', - paper_bgcolor='white', - font=dict(family='Inter, sans-serif'), - showlegend=False + height=300 ) return fig @@ -2438,8 +1076,7 @@ class ReportGenerator: 'agent_contributions': self._generate_agent_contributions, 'key_findings': self._generate_key_findings, 'recommendations': self._generate_recommendations, - 'confidence_analysis': self._generate_confidence_analysis, - 'performance_metrics': self._generate_performance_metrics + 'confidence_analysis': self._generate_confidence_analysis } def generate_report(self, @@ -2460,36 +1097,21 @@ class ReportGenerator: for section in include_sections: if section in self.section_generators: section_content = self.section_generators[section](workflow_result, problem_statement) - if section_content: - report_sections.append(section_content) + report_sections.append(section_content) # Footer report_sections.append(self._generate_footer(workflow_result)) - # Format the complete report - complete_report = "\n\n".join(report_sections) - - # Add CSS styling for better markdown rendering - styled_report = f"""
- -{complete_report} - -
""" - - return styled_report + return "\n\n".join(report_sections) def _generate_header(self, problem_statement: str) -> str: """Generate report header.""" return f"""# Multi-Agent Analysis Report -
+**Generated:** {datetime.now().strftime('%B %d, %Y at %I:%M %p')} -**Generated:** {datetime.now().strftime('%B %d, %Y at %I:%M %p')} -**System:** {Config.COMPANY_NAME} **Problem Statement:** {problem_statement} -
- ---""" def _generate_executive_summary(self, workflow_result: Dict[str, Any], @@ -2498,41 +1120,25 @@ class ReportGenerator: tasks = workflow_result.get('tasks', []) success_rate = workflow_result.get('success_rate', 0) execution_time = workflow_result.get('execution_time', 0) - performance = workflow_result.get('performance_metrics', {}) - # Find synthesis task for summary content + # Find synthesis results synthesis_task = None for task in tasks: if task.assigned_to and 'Synthesizer' in task.assigned_to: synthesis_task = task break - summary_content = "" - if synthesis_task and isinstance(synthesis_task.result, dict): - exec_summary = synthesis_task.result.get('executive_summary', '') - if exec_summary: - summary_content = f"\n\n{exec_summary}" - - # Calculate performance improvement - time_improvement = performance.get('time_improvement_percentage', 0) - efficiency_score = performance.get('efficiency_score', 0) - summary = f"""## Executive Summary -
- -**Key Performance Indicators:** -- **Task Completion Rate:** {success_rate:.0%} -- **Total Execution Time:** {execution_time:.1f} seconds -- **Performance Improvement:** {time_improvement:.1f}% faster than single-agent approach -- **System Efficiency Score:** {efficiency_score:.2f}/1.0 - -
- -The multi-agent system successfully analyzed the problem through coordinated efforts of specialized agents, achieving superior results compared to traditional single-agent approaches. -{summary_content} +The multi-agent system successfully analyzed the problem with a **{success_rate:.0%} task completion rate** in **{execution_time:.1f} seconds**. -**Bottom Line:** The analysis provides actionable insights with high confidence, leveraging the collective intelligence of multiple specialized agents.""" +""" + + if synthesis_task and synthesis_task.result: + if isinstance(synthesis_task.result, dict) and 'executive_summary' in synthesis_task.result: + summary += synthesis_task.result['executive_summary'] + else: + summary += "The analysis revealed key insights across multiple dimensions of the problem." return summary @@ -2541,28 +1147,19 @@ The multi-agent system successfully analyzed the problem through coordinated eff """Generate task analysis section.""" tasks = workflow_result.get('tasks', []) - if not tasks: - return "" - content = "## Task Analysis\n\n" - # Task overview + # Group tasks by status completed_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED] failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED] - content += f"""
- -**Task Overview:** -- Total Tasks: {len(tasks)} -- Completed: {len(completed_tasks)} ✓ -- Failed: {len(failed_tasks)} ✗ -- Average Confidence: {np.mean([t.confidence for t in completed_tasks]) if completed_tasks else 0:.2%} - -
- -### Task Breakdown by Agent\n\n""" + content += f"**Total Tasks:** {len(tasks)}\n" + content += f"**Completed:** {len(completed_tasks)}\n" + content += f"**Failed:** {len(failed_tasks)}\n\n" + + # List tasks by agent + content += "### Tasks by Agent\n\n" - # Group tasks by agent agents_tasks = {} for task in tasks: agent = task.assigned_to or "Unassigned" @@ -2571,73 +1168,32 @@ The multi-agent system successfully analyzed the problem through coordinated eff agents_tasks[agent].append(task) for agent, agent_tasks in agents_tasks.items(): - # Get agent role - role = "Unknown" - for t in agent_tasks: - if t.metadata.get('suggested_role'): - role = t.metadata['suggested_role'] - break - - content += f"#### {agent} ({role})\n\n" - + content += f"**{agent}:**\n" for task in agent_tasks: - status_icon = "✓" if task.status == TaskStatus.COMPLETED else "✗" - confidence_bar = self._create_confidence_bar(task.confidence) - exec_time = task.performance_metrics.get('execution_time', 0) - - content += f"""
- -**{status_icon} Task {task.id}** -*Description:* {task.description} -*Execution Time:* {exec_time:.1f}s -*Confidence:* {confidence_bar} {task.confidence:.0%} - -
-""" + status_emoji = "✓" if task.status == TaskStatus.COMPLETED else "✗" + confidence = f"({task.confidence:.0%} confidence)" if task.confidence > 0 else "" + content += f"- {status_emoji} {task.description} {confidence}\n" + content += "\n" return content - def _create_confidence_bar(self, confidence: float) -> str: - """Create visual confidence bar.""" - filled = int(confidence * 10) - empty = 10 - filled - return "▰" * filled + "▱" * empty - def _generate_agent_contributions(self, workflow_result: Dict[str, Any], problem_statement: str) -> str: """Generate agent contributions section.""" contributions = workflow_result.get('agent_contributions', {}) - if not contributions: - return "" - content = "## Agent Contributions\n\n" - # Create contribution cards for each agent for agent, stats in contributions.items(): role = stats.get('role', 'Unknown') tasks_completed = stats.get('tasks_completed', 0) avg_confidence = stats.get('average_confidence', 0) exec_time = stats.get('total_execution_time', 0) - collab_count = stats.get('collaboration_count', 0) - # Get role color - role_color = Config.NODE_COLORS.get(role, '#95a5a6') - - content += f"""
- -### {agent} -**Role:** {role} - -| Metric | Value | -|--------|-------| -| Tasks Completed | {tasks_completed} | -| Average Confidence | {avg_confidence:.0%} | -| Total Execution Time | {exec_time:.1f}s | -| Collaborations | {collab_count} | - -
-""" + content += f"### {agent} ({role})\n" + content += f"- Tasks Completed: {tasks_completed}\n" + content += f"- Average Confidence: {avg_confidence:.0%}\n" + content += f"- Total Execution Time: {exec_time:.1f}s\n\n" return content @@ -2648,50 +1204,37 @@ The multi-agent system successfully analyzed the problem through coordinated eff content = "## Key Findings\n\n" - findings_by_type = { - 'Research Findings': [], - 'Analytical Insights': [], - 'Critical Observations': [], - 'Synthesized Conclusions': [] - } - # Extract findings from different agent types for task in tasks: if task.status == TaskStatus.COMPLETED and task.result: - role = task.metadata.get('suggested_role', '') + agent_role = task.metadata.get('suggested_role', '') - if isinstance(task.result, dict): - if 'Researcher' in role: - if 'key_points' in task.result: - findings_by_type['Research Findings'].extend(task.result['key_points'][:3]) - - elif 'Analyst' in role: - if 'insights' in task.result: - findings_by_type['Analytical Insights'].extend(task.result['insights'][:3]) - - elif 'Critic' in role: - if 'strengths' in task.result: - for strength in task.result['strengths'][:2]: - if isinstance(strength, dict): - findings_by_type['Critical Observations'].append(strength.get('strength', str(strength))) - else: - findings_by_type['Critical Observations'].append(str(strength)) + if 'Researcher' in agent_role and isinstance(task.result, dict): + if 'key_points' in task.result: + content += "### Research Findings\n" + for point in task.result['key_points'][:5]: + content += f"- {point}\n" + content += "\n" + + elif 'Analyst' in agent_role and isinstance(task.result, dict): + if 'insights' in task.result: + content += "### Analytical Insights\n" + for insight in task.result['insights'][:5]: + content += f"- {insight}\n" + content += "\n" + + elif 'Critic' in agent_role and isinstance(task.result, dict): + if 'strengths' in task.result: + content += "### Identified Strengths\n" + for strength in task.result['strengths'][:3]: + content += f"- {strength}\n" + content += "\n" - elif 'Synthesizer' in role: - if 'key_themes' in task.result: - for theme in task.result['key_themes'][:2]: - if isinstance(theme, dict): - findings_by_type['Synthesized Conclusions'].append(theme.get('description', str(theme))) - else: - findings_by_type['Synthesized Conclusions'].append(str(theme)) - - # Format findings - for finding_type, findings in findings_by_type.items(): - if findings: - content += f"### {finding_type}\n\n" - for finding in findings: - content += f"- {finding}\n" - content += "\n" + if 'gaps' in task.result: + content += "### Identified Gaps\n" + for gap in task.result['gaps'][:3]: + content += f"- {gap}\n" + content += "\n" return content @@ -2708,67 +1251,53 @@ The multi-agent system successfully analyzed the problem through coordinated eff for task in tasks: if task.status == TaskStatus.COMPLETED and task.result: if isinstance(task.result, dict): - # Check different possible recommendation fields - for field in ['recommendations', 'final_recommendations', 'improvements']: - if field in task.result: - recs = task.result[field] - for rec in recs: - if isinstance(rec, dict): - all_recommendations.append(rec) - else: - all_recommendations.append({ - 'recommendation': str(rec), - 'priority': 'medium', - 'source': task.assigned_to - }) - - if not all_recommendations: - return content + "*No specific recommendations were generated.*\n" - - # Categorize by priority + if 'recommendations' in task.result: + all_recommendations.extend(task.result['recommendations']) + elif 'final_recommendations' in task.result: + all_recommendations.extend(task.result['final_recommendations']) + elif 'improvements' in task.result: + all_recommendations.extend(task.result['improvements']) + + # Remove duplicates and limit + unique_recommendations = [] + seen = set() + for rec in all_recommendations: + rec_lower = rec.lower() + if rec_lower not in seen: + seen.add(rec_lower) + unique_recommendations.append(rec) + + # Categorize recommendations high_priority = [] medium_priority = [] low_priority = [] - for rec in all_recommendations: - priority = rec.get('priority', 'medium') - if priority == 'high': + for rec in unique_recommendations[:10]: + # Simple prioritization based on keywords + if any(word in rec.lower() for word in ['critical', 'must', 'essential', 'immediately']): high_priority.append(rec) - elif priority == 'low': - low_priority.append(rec) - else: + elif any(word in rec.lower() for word in ['should', 'recommend', 'important']): medium_priority.append(rec) + else: + low_priority.append(rec) - # Format recommendations if high_priority: - content += """
- -### 🔴 High Priority - -""" - for rec in high_priority[:3]: - content += f"- {rec.get('recommendation', rec)}\n" - content += "\n
\n" + content += "### High Priority\n" + for rec in high_priority: + content += f"- {rec}\n" + content += "\n" if medium_priority: - content += """
- -### 🟡 Medium Priority - -""" - for rec in medium_priority[:4]: - content += f"- {rec.get('recommendation', rec)}\n" - content += "\n
\n" + content += "### Medium Priority\n" + for rec in medium_priority: + content += f"- {rec}\n" + content += "\n" if low_priority: - content += """
- -### 🟢 Low Priority - -""" - for rec in low_priority[:3]: - content += f"- {rec.get('recommendation', rec)}\n" - content += "\n
\n" + content += "### Low Priority\n" + for rec in low_priority: + content += f"- {rec}\n" + content += "\n" return content @@ -2780,102 +1309,28 @@ The multi-agent system successfully analyzed the problem through coordinated eff content = "## Confidence Analysis\n\n" - # Calculate overall confidence - task_confidences = [t.confidence for t in tasks if t.confidence > 0] - overall_confidence = np.mean(task_confidences) if task_confidences else 0 - - # Create confidence gauge - confidence_level = "High" if overall_confidence > 0.8 else "Medium" if overall_confidence > 0.6 else "Low" - confidence_color = "#2ecc71" if overall_confidence > 0.8 else "#f39c12" if overall_confidence > 0.6 else "#e74c3c" - - content += f"""
-
-

Overall Confidence Score

-

{overall_confidence:.0%}

-

Level: {confidence_level}

-
-
- -### Confidence by Agent Role - -""" + # Overall confidence + overall_confidence = np.mean([t.confidence for t in tasks if t.confidence > 0]) + content += f"**Overall Confidence Score:** {overall_confidence:.0%}\n\n" - # Create table of agent confidences - content += "| Agent Role | Average Confidence | Tasks |\n" - content += "|------------|-------------------|-------|\n" + # Confidence by agent type + content += "### Confidence by Agent Role\n" for agent, stats in contributions.items(): - role = stats.get('role', 'Unknown') avg_conf = stats.get('average_confidence', 0) - tasks = stats.get('tasks_completed', 0) - content += f"| {role} | {avg_conf:.0%} | {tasks} |\n" + role = stats.get('role', 'Unknown') + content += f"- **{role}**: {avg_conf:.0%}\n" - content += "\n### Confidence Distribution\n\n" + content += "\n### Confidence Distribution\n" # Categorize confidence levels - high_conf = len([t for t in tasks if t.confidence >= 0.8]) - medium_conf = len([t for t in tasks if 0.5 <= t.confidence < 0.8]) - low_conf = len([t for t in tasks if t.confidence < 0.5]) + high_conf = [t for t in tasks if t.confidence >= 0.8] + medium_conf = [t for t in tasks if 0.5 <= t.confidence < 0.8] + low_conf = [t for t in tasks if t.confidence < 0.5] - content += f"""- **High Confidence (≥80%):** {high_conf} tasks -- **Medium Confidence (50-79%):** {medium_conf} tasks -- **Low Confidence (<50%):** {low_conf} tasks - -*The confidence scores reflect the system's assessment of result quality and reliability based on evidence strength, consistency, and completeness.*""" - - return content - - def _generate_performance_metrics(self, workflow_result: Dict[str, Any], - problem_statement: str) -> str: - """Generate performance metrics section.""" - performance = workflow_result.get('performance_metrics', {}) - - if not performance: - return "" - - content = "## Performance Metrics\n\n" - - # Extract metrics - total_time = performance.get('total_execution_time', 0) - avg_task_time = performance.get('average_task_completion_time', 0) - total_collab = performance.get('total_collaborations', 0) - total_messages = performance.get('total_messages', 0) - efficiency = performance.get('efficiency_score', 0) - time_improvement = performance.get('time_improvement_percentage', 0) - - content += f"""
- -### System Performance Overview - -| Metric | Value | Benchmark | -|--------|-------|-----------| -| Total Execution Time | {total_time:.1f}s | - | -| Average Task Time | {avg_task_time:.1f}s | {Config.BENCHMARK_BASELINE['single_agent_time']:.1f}s | -| Time Improvement | {time_improvement:.1f}% | Baseline | -| Total Collaborations | {total_collab} | - | -| Message Exchanges | {total_messages} | - | -| Efficiency Score | {efficiency:.2%} | 50% | - -
- -### Performance Insights - -""" - - # Add performance insights - if time_improvement > 30: - content += "- **Exceptional Performance**: The multi-agent system achieved significant time savings through parallel processing.\n" - elif time_improvement > 15: - content += "- **Good Performance**: The system demonstrated efficient task distribution and execution.\n" - else: - content += "- **Standard Performance**: The system completed tasks within expected parameters.\n" - - if efficiency > 0.8: - content += "- **High Efficiency**: Excellent resource utilization and agent coordination.\n" - elif efficiency > 0.6: - content += "- **Moderate Efficiency**: Good balance between speed and quality.\n" - else: - content += "- **Efficiency Opportunity**: Consider optimizing agent workflows for better performance.\n" + content += f"- High Confidence (≥80%): {len(high_conf)} tasks\n" + content += f"- Medium Confidence (50-79%): {len(medium_conf)} tasks\n" + content += f"- Low Confidence (<50%): {len(low_conf)} tasks\n" return content @@ -2886,16 +1341,10 @@ The multi-agent system successfully analyzed the problem through coordinated eff return f"""--- -
- -**Report Generation Details** -Analysis completed in {execution_time:.1f} seconds -Report generated at {timestamp.strftime('%B %d, %Y at %I:%M %p')} - -*Powered by {Config.COMPANY_NAME}* -*Advanced Multi-Agent AI Collaboration System* - -
""" +**Report Generation Details:** +- Analysis completed in {execution_time:.1f} seconds +- Report generated at {timestamp.strftime('%B %d, %Y at %I:%M %p')} +- Powered by Multi-Agent AI Collaboration System""" # Gradio Interface Functions def create_gradio_interface(): @@ -2909,27 +1358,22 @@ def create_gradio_interface(): # State variables current_workflow = None current_problem = "" - demo_mode = False - def initialize_agents(api_key: str, model: str = "gpt-4", use_demo: bool = False) -> str: + def initialize_agents(api_key: str, model: str = "gpt-4") -> str: """Initialize the multi-agent system.""" - nonlocal coordinator, demo_mode + nonlocal coordinator - demo_mode = use_demo - - if not use_demo and not api_key: - return "❌ Please provide an OpenAI API key or enable Demo Mode to initialize the agents." + if not api_key: + return "Please provide an OpenAI API key to initialize the agents." try: - # Initialize LLM only if not in demo mode - llm = None - if not use_demo and api_key: - llm = ChatOpenAI( - api_key=api_key, - model=model, - temperature=Config.TEMPERATURE, - max_tokens=Config.MAX_TOKENS - ) + # Initialize LLM + llm = ChatOpenAI( + api_key=api_key, + model=model, + temperature=Config.TEMPERATURE, + max_tokens=Config.MAX_TOKENS + ) # Create coordinator coordinator = CoordinatorAgent("Coordinator", llm) @@ -2946,48 +1390,42 @@ def create_gradio_interface(): coordinator.register_agent(critic) coordinator.register_agent(synthesizer) - mode_text = "Demo Mode" if use_demo else f"Live Mode ({model})" - return f"✅ Successfully initialized multi-agent system with {len(coordinator.agents)} agents in {mode_text}." + return f"Successfully initialized multi-agent system with {len(coordinator.agents)} agents." except Exception as e: logger.error(f"Error initializing agents: {str(e)}") - return f"❌ Error initializing agents: {str(e)}" + return f"Error initializing agents: {str(e)}" - async def analyze_problem(problem: str, execution_mode: str, use_template: bool = False) -> Tuple[str, Any, Any, Any, Any]: + async def analyze_problem(problem: str, execution_mode: str) -> Tuple[str, Any, Any, Any]: """Analyze a problem using the multi-agent system.""" nonlocal current_workflow, current_problem if not coordinator: - return "❌ Please initialize the agents first.", None, None, None, None + return "Please initialize the agents first.", None, None, None if not problem: - return "❌ Please enter a problem to analyze.", None, None, None, None + return "Please enter a problem to analyze.", None, None, None current_problem = problem try: - # Update status - status = "🔄 Decomposing problem into subtasks..." - - # Decompose problem - tasks = await coordinator.decompose_problem(problem, use_template=use_template) + # Decompose problem into tasks + status = "Decomposing problem into subtasks..." + tasks = await coordinator.decompose_problem(problem) if not tasks: - return "❌ Failed to decompose problem into tasks.", None, None, None, None + return "Failed to decompose problem into tasks.", None, None, None - # Update status - status = f"🔄 Executing {len(tasks)} tasks using {execution_mode} mode..." + # Execute workflow + status = f"Executing {len(tasks)} tasks using {execution_mode} mode..." parallel = execution_mode == "Parallel" - # Execute workflow current_workflow = await coordinator.execute_workflow(tasks, parallel=parallel) # Create visualizations - active_agents = list(coordinator.agents.keys()) - workflow_graph = visualizer.create_workflow_graph( current_workflow['workflow_graph'], - active_agents=active_agents + active_agents=list(coordinator.agents.keys()) ) timeline_chart = visualizer.create_task_timeline(tasks) @@ -2996,40 +1434,28 @@ def create_gradio_interface(): current_workflow['agent_contributions'] ) - performance_chart = visualizer.create_performance_comparison( - current_workflow['performance_metrics'] - ) - # Generate status summary success_rate = current_workflow['success_rate'] execution_time = current_workflow['execution_time'] - performance = current_workflow['performance_metrics'] - status = f"""✅ **Analysis completed successfully!** - -📊 **Results Summary:** + status = f"""Analysis completed successfully! + - Tasks executed: {len(tasks)} - Success rate: {success_rate:.0%} - Execution time: {execution_time:.1f} seconds -- Performance improvement: {performance.get('time_improvement_percentage', 0):.1f}% faster -- Agents involved: {len(coordinator.agents)} - -🤖 **Agent Activity:** -- Total collaborations: {performance.get('total_collaborations', 0)} -- Messages exchanged: {performance.get('total_messages', 0)} -- Efficiency score: {performance.get('efficiency_score', 0):.2%}""" +- Agents involved: {len(coordinator.agents)}""" - return status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart + return status, workflow_graph, timeline_chart, confidence_heatmap except Exception as e: logger.error(f"Error analyzing problem: {str(e)}") - return f"❌ Error during analysis: {str(e)}", None, None, None, None + return f"Error during analysis: {str(e)}", None, None, None def generate_report(selected_sections: List[str]) -> str: """Generate a report from the current workflow results.""" if not current_workflow: - return "❌ No analysis results available. Please run an analysis first." + return "No analysis results available. Please run an analysis first." try: report = report_generator.generate_report( @@ -3042,502 +1468,229 @@ def create_gradio_interface(): except Exception as e: logger.error(f"Error generating report: {str(e)}") - return f"❌ Error generating report: {str(e)}" + return f"Error generating report: {str(e)}" def get_agent_details(agent_name: str) -> str: """Get detailed information about a specific agent.""" if not coordinator or agent_name not in coordinator.agents: - return "❌ Agent not found or system not initialized." + return "Agent not found or system not initialized." agent = coordinator.agents[agent_name] - status = agent.get_status_summary() - - details = f"""## Agent Profile: {agent.name} - -
- -**Role:** {agent.role.value} -**Status:** {'🟢 Active' if agent.active else '🔴 Inactive'} -**Completed Tasks:** {len(agent.completed_tasks)} -**Current Task:** {agent.current_task.description if agent.current_task else 'None'} -**Average Confidence:** {status['average_confidence']:.0%} -**Collaborations:** {status['collaboration_count']} - -
- -### Recent Task History -""" - - for i, task in enumerate(agent.completed_tasks[-5:], 1): - status_icon = "✅" if task.status == TaskStatus.COMPLETED else "❌" - exec_time = task.performance_metrics.get('execution_time', 0) - - details += f""" -**{i}. {status_icon} {task.id}** -- Description: {task.description} -- Confidence: {task.confidence:.0%} -- Execution Time: {exec_time:.1f}s -""" - - # Add performance insights - if agent.performance_tracker.metrics['task_completion_times']: - avg_time = np.mean(agent.performance_tracker.metrics['task_completion_times']) - details += f"\n### Performance Statistics\n" - details += f"- Average Task Time: {avg_time:.1f}s\n" - details += f"- Total Active Time: {sum(agent.performance_tracker.metrics['task_completion_times']):.1f}s\n" - - return details - - def get_workflow_insights() -> str: - """Get insights about the multi-agent system performance.""" - if not coordinator: - return "❌ System not initialized." - - insights = coordinator.get_workflow_insights() - - if insights.get('total_workflows_executed', 0) == 0: - return "📊 No workflow executions yet. Run an analysis to see performance insights." - - content = f"""## Workflow Insights + details = f"""## Agent: {agent.name} -
+**Role:** {agent.role.value} -### System Performance Overview +**Status:** {'Active' if agent.active else 'Inactive'} -- **Total Workflows:** {insights['total_workflows_executed']} -- **Average Execution Time:** {insights['average_execution_time']:.1f}s -- **Average Success Rate:** {insights['average_success_rate']:.0%} -- **Most Efficient Agent:** {insights.get('most_efficient_agent', 'N/A')} -- **Highest Quality Agent:** {insights.get('highest_quality_agent', 'N/A')} +**Completed Tasks:** {len(agent.completed_tasks)} -
+**Current Task:** {agent.current_task.description if agent.current_task else 'None'} -### Agent Efficiency Rankings +### Recent Tasks: """ - if insights.get('agent_efficiency'): - content += "\n| Agent | Avg Time/Task | Avg Confidence | Total Tasks |\n" - content += "|-------|---------------|----------------|-------------|\n" - - for agent, efficiency in insights['agent_efficiency'].items(): - content += f"| {agent} | {efficiency['avg_time_per_task']:.1f}s | " - content += f"{efficiency['avg_confidence']:.0%} | {efficiency['total_tasks']} |\n" + for task in agent.completed_tasks[-5:]: + details += f"\n- **{task.id}**: {task.description}" + details += f"\n - Status: {task.status.value}" + details += f"\n - Confidence: {task.confidence:.0%}" + details += f"\n - Execution Time: {(task.completed_at - task.created_at).total_seconds():.1f}s\n" - return content + return details # Create custom CSS for professional styling custom_css = """ .gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', sans-serif; - max-width: 1400px; - margin: 0 auto; - } - - .gr-button { - font-weight: 600; - transition: all 0.3s ease; } .gr-button-primary { - background-color: #3498db !important; - border-color: #3498db !important; + background-color: #2563eb !important; + border-color: #2563eb !important; } .gr-button-primary:hover { - background-color: #2980b9 !important; - border-color: #2980b9 !important; - transform: translateY(-1px); - box-shadow: 0 4px 12px rgba(52, 152, 219, 0.3); + background-color: #1d4ed8 !important; + border-color: #1d4ed8 !important; } - .gr-button-secondary { - background-color: #95a5a6 !important; - border-color: #95a5a6 !important; + .container { + max-width: 1200px; + margin: 0 auto; } h1 { - color: #2c3e50; + background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); + -webkit-background-clip: text; + -webkit-text-fill-color: transparent; + text-align: center; + font-size: 2.5rem; font-weight: 800; - margin-bottom: 0.5rem; + margin-bottom: 1rem; } h2 { - color: #34495e; + color: #1f2937; + font-size: 1.5rem; font-weight: 600; - margin-top: 1.5rem; - } - - h3 { - color: #34495e; - font-weight: 600; - } - - .gr-box { - border-radius: 8px; - border-color: #e0e0e0; + margin-top: 2rem; + margin-bottom: 1rem; } - .gr-padded { - padding: 20px; - } - - .gr-form { - background-color: #f8f9fa; + .status-box { + background-color: #f3f4f6; border-radius: 8px; - padding: 20px; - border: 1px solid #e0e0e0; - } - - .status-text { - font-size: 14px; - line-height: 1.6; - } - - .markdown-text { - line-height: 1.6; - color: #2c3e50; - } - - .markdown-text h1, .markdown-text h2, .markdown-text h3 { - margin-top: 1.5em; - margin-bottom: 0.75em; - } - - .markdown-text ul, .markdown-text ol { - margin-left: 1.5em; - margin-bottom: 1em; - } - - .markdown-text code { - background-color: #f0f0f0; - padding: 2px 4px; - border-radius: 3px; - font-size: 0.9em; - } - - .markdown-text pre { - background-color: #f0f0f0; - padding: 10px; - border-radius: 5px; - overflow-x: auto; - } - - .tab-nav { - background-color: #f8f9fa; - border-bottom: 2px solid #e0e0e0; - } - - /* Loading animation */ - @keyframes pulse { - 0% { opacity: 0.6; } - 50% { opacity: 1; } - 100% { opacity: 0.6; } - } - - .loading { - animation: pulse 1.5s ease-in-out infinite; + padding: 1rem; + margin: 1rem 0; + border-left: 4px solid #3b82f6; } """ # Create Gradio interface - with gr.Blocks( - title="Multi-Agent AI Collaboration System", - theme=gr.themes.Base(), - css=custom_css - ) as interface: + with gr.Blocks(title="Multi-Agent AI Collaboration System", + theme=gr.themes.Base(), + css=custom_css) as interface: - # Header - gr.HTML(""" -
-

- Multi-Agent AI Collaboration System -

-

- Advanced AI agents working together to solve complex problems through intelligent task decomposition and parallel processing -

-
+ gr.Markdown(""" + # Multi-Agent AI Collaboration System + + Advanced AI system with specialized agents working together to solve complex problems through intelligent task decomposition and parallel processing. """) # System Configuration - with gr.Group(): - gr.Markdown("### 🔧 System Configuration") - - with gr.Row(): - with gr.Column(scale=3): - api_key_input = gr.Textbox( - label="OpenAI API Key", - placeholder="sk-...", - type="password", - info="Required for live mode. Leave empty for demo mode." - ) - with gr.Column(scale=1): - model_select = gr.Dropdown( - choices=["gpt-4", "gpt-3.5-turbo"], - value="gpt-4", - label="Model", - info="Select the LLM model" - ) - with gr.Column(scale=1): - demo_mode_checkbox = gr.Checkbox( - label="Demo Mode", - value=False, - info="Run without API key" - ) - with gr.Column(scale=1): - init_button = gr.Button( - "Initialize Agents", - variant="primary", - size="lg" - ) - - init_status = gr.Textbox( - label="Initialization Status", - interactive=False, - elem_classes=["status-text"] - ) + with gr.Row(): + with gr.Column(scale=3): + api_key_input = gr.Textbox( + label="OpenAI API Key", + placeholder="sk-...", + type="password" + ) + with gr.Column(scale=1): + model_select = gr.Dropdown( + choices=["gpt-4", "gpt-3.5-turbo"], + value="gpt-4", + label="Model" + ) + with gr.Column(scale=1): + init_button = gr.Button("Initialize Agents", variant="primary") + + init_status = gr.Textbox(label="Initialization Status", interactive=False) # Main tabs - with gr.Tabs() as tabs: + with gr.Tabs(): # Problem Analysis Tab - with gr.TabItem("🔍 Problem Analysis", id=1): - with gr.Group(): - gr.Markdown("### Enter a complex problem for multi-agent analysis") - - problem_input = gr.Textbox( - label="Problem Statement", - placeholder="Example: Analyze the potential impact of AI on healthcare delivery in the next 5 years", - lines=3, - info="Describe a complex problem that requires multiple perspectives" - ) - - with gr.Row(): - with gr.Column(scale=1): - execution_mode = gr.Radio( - choices=["Sequential", "Parallel"], - value="Parallel", - label="Execution Mode", - info="Parallel mode is faster but requires more resources" - ) - with gr.Column(scale=1): - use_template = gr.Checkbox( - label="Use Workflow Template", - value=True, - info="Automatically match to predefined workflows" - ) - with gr.Column(scale=2): - analyze_button = gr.Button( - "🚀 Analyze Problem", - variant="primary", - size="lg" - ) - - analysis_status = gr.Textbox( - label="Analysis Status", - interactive=False, - lines=8, - elem_classes=["status-text"] + with gr.TabItem("Problem Analysis"): + gr.Markdown("### Enter a complex problem for multi-agent analysis") + + problem_input = gr.Textbox( + label="Problem Statement", + placeholder="Example: Analyze the potential impact of AI on healthcare delivery in the next 5 years", + lines=3 + ) + + with gr.Row(): + execution_mode = gr.Radio( + choices=["Sequential", "Parallel"], + value="Parallel", + label="Execution Mode" ) + analyze_button = gr.Button("Analyze Problem", variant="primary") + + analysis_status = gr.Textbox( + label="Analysis Status", + interactive=False, + lines=5 + ) # Visualization outputs - with gr.Group(): - gr.Markdown("### 📊 Analysis Visualizations") - - with gr.Row(): - workflow_graph = gr.Plot(label="Agent Collaboration Network") - - with gr.Row(): - with gr.Column(): - timeline_chart = gr.Plot(label="Task Execution Timeline") - with gr.Column(): - confidence_heatmap = gr.Plot(label="Agent Performance Metrics") - - with gr.Row(): - performance_chart = gr.Plot(label="Performance Comparison") + with gr.Row(): + workflow_graph = gr.Plot(label="Agent Collaboration Network") + + with gr.Row(): + timeline_chart = gr.Plot(label="Task Execution Timeline") + confidence_heatmap = gr.Plot(label="Agent Performance Metrics") # Agent Details Tab - with gr.TabItem("🤖 Agent Details", id=2): - with gr.Group(): - gr.Markdown("### View detailed information about each agent") - - with gr.Row(): - agent_selector = gr.Dropdown( - choices=["Researcher-1", "Analyst-1", "Critic-1", "Synthesizer-1"], - label="Select Agent", - info="Choose an agent to view their profile and performance" - ) - agent_details_button = gr.Button( - "Get Agent Details", - variant="secondary" - ) - - agent_details_output = gr.Markdown( - elem_classes=["markdown-text"] - ) + with gr.TabItem("Agent Details"): + gr.Markdown("### View detailed information about each agent") - with gr.Group(): - gr.Markdown("### 📈 System Insights") - - insights_button = gr.Button( - "Get Workflow Insights", - variant="secondary" - ) - - insights_output = gr.Markdown( - elem_classes=["markdown-text"] - ) + agent_selector = gr.Dropdown( + choices=["Researcher-1", "Analyst-1", "Critic-1", "Synthesizer-1"], + label="Select Agent" + ) + + agent_details_button = gr.Button("Get Agent Details") + + agent_details_output = gr.Markdown() # Report Generation Tab - with gr.TabItem("📄 Report Generation", id=3): - with gr.Group(): - gr.Markdown("### Generate comprehensive analysis report") - - section_selector = gr.CheckboxGroup( - choices=[ - "executive_summary", - "task_analysis", - "agent_contributions", - "key_findings", - "recommendations", - "confidence_analysis", - "performance_metrics" - ], - value=[ - "executive_summary", - "key_findings", - "recommendations", - "confidence_analysis" - ], - label="Select Report Sections", - info="Choose which sections to include in the report" - ) - - generate_report_button = gr.Button( - "📄 Generate Report", - variant="primary", - size="lg" - ) - - report_output = gr.Markdown( - elem_classes=["markdown-text"] - ) - - # Example Problems Tab - with gr.TabItem("💡 Example Problems", id=4): - gr.Markdown(""" - ### Example Problems for Analysis + with gr.TabItem("Report Generation"): + gr.Markdown("### Generate comprehensive analysis report") - Click on any example to load it into the analysis tab. These examples demonstrate different types of complex problems suitable for multi-agent analysis. - """) + section_selector = gr.CheckboxGroup( + choices=[ + "executive_summary", + "task_analysis", + "agent_contributions", + "key_findings", + "recommendations", + "confidence_analysis" + ], + value=[ + "executive_summary", + "key_findings", + "recommendations" + ], + label="Select Report Sections" + ) - example_problems = [ - { - "title": "🏢 Business Strategy", - "problem": "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty and managing existing physical stores", - "description": "Complex business transformation requiring market analysis, risk assessment, and strategic planning" - }, - { - "title": "🔗 Technology Assessment", - "problem": "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management for a global manufacturing company", - "description": "Technical evaluation requiring understanding of emerging technology and business operations" - }, - { - "title": "🚗 Market Analysis", - "problem": "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants in the North American market", - "description": "Market research requiring industry analysis, competitor assessment, and trend identification" - }, - { - "title": "🏠 Policy Evaluation", - "problem": "Assess the implications of remote work policies on organizational culture, productivity, and talent retention in technology companies", - "description": "Organizational analysis requiring understanding of human resources, culture, and productivity metrics" - }, - { - "title": "🏥 Innovation Planning", - "problem": "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools while ensuring patient privacy and regulatory compliance", - "description": "Innovation strategy requiring technical, regulatory, and ethical considerations" - } - ] + generate_report_button = gr.Button("Generate Report", variant="primary") - example_buttons = [] - for i, example in enumerate(example_problems): - with gr.Group(): - gr.Markdown(f""" - #### {example['title']} - *{example['description']}* - """) - btn = gr.Button( - f"Load This Example", - variant="secondary", - size="sm" - ) - example_buttons.append((btn, example['problem'])) + report_output = gr.Markdown() - # Help Tab - with gr.TabItem("❓ Help", id=5): + # Example Problems Tab + with gr.TabItem("Example Problems"): gr.Markdown(""" - ## How to Use the Multi-Agent AI Collaboration System - - ### 🚀 Getting Started - - 1. **Initialize the System** - - Enter your OpenAI API key for live analysis (optional) - - Or enable Demo Mode to explore without an API key - - Select your preferred model (GPT-4 recommended) - - Click "Initialize Agents" - - 2. **Analyze a Problem** - - Enter a complex problem in the Problem Analysis tab - - Choose execution mode (Parallel is faster) - - Optionally use workflow templates for common problem types - - Click "Analyze Problem" + ### Example Problems for Analysis - 3. **Review Results** - - View the agent collaboration network - - Check the task execution timeline - - Review performance metrics - - Explore individual agent details + Click on any example to load it into the analysis tab: - 4. **Generate Report** - - Select desired report sections - - Click "Generate Report" - - Copy or save the comprehensive analysis + 1. **Business Strategy**: "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty" - ### 🤖 Understanding the Agents + 2. **Technology Assessment**: "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management" - - **Researcher**: Gathers information and identifies key facts - - **Analyst**: Processes data and identifies patterns - - **Critic**: Evaluates quality and identifies gaps - - **Synthesizer**: Combines insights into actionable recommendations - - **Coordinator**: Manages workflow and facilitates collaboration + 3. **Market Analysis**: "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants" - ### 💡 Tips for Best Results + 4. **Policy Evaluation**: "Assess the implications of remote work policies on organizational culture and productivity" - - Be specific and detailed in your problem statements - - Complex, multi-faceted problems work best - - Use parallel execution for faster results - - Review agent details to understand the analysis process - - Generate reports for comprehensive documentation + 5. **Innovation Planning**: "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools" + """) - ### 🔧 Troubleshooting + example_buttons = [] + example_problems = [ + "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty", + "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management", + "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants", + "Assess the implications of remote work policies on organizational culture and productivity", + "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools" + ] - - **Initialization fails**: Check your API key or enable Demo Mode - - **Analysis takes too long**: Try Sequential mode or simpler problems - - **Empty visualizations**: Ensure analysis completed successfully - - **Report generation fails**: Check that analysis was completed first - """) + for i, problem in enumerate(example_problems): + btn = gr.Button(f"Load Example {i+1}", size="sm") + example_buttons.append(btn) # Event handlers init_button.click( fn=initialize_agents, - inputs=[api_key_input, model_select, demo_mode_checkbox], + inputs=[api_key_input, model_select], outputs=init_status ) analyze_button.click( - fn=lambda p, m, t: asyncio.run(analyze_problem(p, m, t)), - inputs=[problem_input, execution_mode, use_template], - outputs=[analysis_status, workflow_graph, timeline_chart, confidence_heatmap, performance_chart] + fn=lambda p, m: asyncio.run(analyze_problem(p, m)), + inputs=[problem_input, execution_mode], + outputs=[analysis_status, workflow_graph, timeline_chart, confidence_heatmap] ) agent_details_button.click( @@ -3546,12 +1699,6 @@ def create_gradio_interface(): outputs=agent_details_output ) - insights_button.click( - fn=get_workflow_insights, - inputs=[], - outputs=insights_output - ) - generate_report_button.click( fn=generate_report, inputs=section_selector, @@ -3559,13 +1706,10 @@ def create_gradio_interface(): ) # Example button handlers - for btn, problem in example_buttons: + for i, btn in enumerate(example_buttons): btn.click( - fn=lambda p=problem: p, + fn=lambda idx=i: example_problems[idx], outputs=problem_input - ).then( - fn=lambda: gr.Tabs.update(selected=1), - outputs=tabs ) return interface @@ -3573,8 +1717,4 @@ def create_gradio_interface(): # Main execution if __name__ == "__main__": interface = create_gradio_interface() - interface.launch( - share=True, - favicon_path=None, - show_error=True - ) \ No newline at end of file + interface.launch(share=True) \ No newline at end of file