|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import json |
|
import time |
|
import asyncio |
|
import hashlib |
|
import logging |
|
from datetime import datetime |
|
from typing import Dict, List, Tuple, Optional, Any, Union, Set |
|
from dataclasses import dataclass, field |
|
from enum import Enum |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
import networkx as nx |
|
import matplotlib.pyplot as plt |
|
from matplotlib.patches import FancyBboxPatch |
|
import plotly.graph_objects as go |
|
from plotly.subplots import make_subplots |
|
|
|
|
|
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage |
|
from langchain_openai import ChatOpenAI |
|
from langchain.callbacks.base import BaseCallbackHandler |
|
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
from langchain.memory import ConversationBufferMemory |
|
from pydantic import BaseModel, Field |
|
|
|
|
|
import aiohttp |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class Config: |
|
"""Configuration settings for the multi-agent system.""" |
|
|
|
|
|
DEFAULT_MODEL = "gpt-4" |
|
TEMPERATURE = 0.7 |
|
MAX_TOKENS = 1500 |
|
|
|
|
|
MAX_ITERATIONS = 10 |
|
COLLABORATION_TIMEOUT = 300 |
|
|
|
|
|
GRAPH_UPDATE_INTERVAL = 0.5 |
|
NODE_COLORS = { |
|
'Researcher': '#3498db', |
|
'Analyst': '#e74c3c', |
|
'Critic': '#f39c12', |
|
'Synthesizer': '#2ecc71', |
|
'Coordinator': '#9b59b6' |
|
} |
|
|
|
|
|
CONFIDENCE_THRESHOLD = 0.7 |
|
MAX_REPORT_SECTIONS = 10 |
|
|
|
class AgentRole(Enum): |
|
"""Enumeration of agent roles in the system.""" |
|
RESEARCHER = "Researcher" |
|
ANALYST = "Analyst" |
|
CRITIC = "Critic" |
|
SYNTHESIZER = "Synthesizer" |
|
COORDINATOR = "Coordinator" |
|
|
|
class TaskStatus(Enum): |
|
"""Task execution status.""" |
|
PENDING = "pending" |
|
IN_PROGRESS = "in_progress" |
|
COMPLETED = "completed" |
|
FAILED = "failed" |
|
|
|
@dataclass |
|
class Task: |
|
"""Represents a task to be executed by agents.""" |
|
id: str |
|
description: str |
|
assigned_to: Optional[str] = None |
|
status: TaskStatus = TaskStatus.PENDING |
|
dependencies: List[str] = field(default_factory=list) |
|
result: Optional[Any] = None |
|
confidence: float = 0.0 |
|
created_at: datetime = field(default_factory=datetime.now) |
|
completed_at: Optional[datetime] = None |
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
@dataclass |
|
class AgentMessage: |
|
"""Message passed between agents.""" |
|
sender: str |
|
recipient: str |
|
content: str |
|
message_type: str = "task" |
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
timestamp: datetime = field(default_factory=datetime.now) |
|
|
|
class AgentMemory: |
|
"""Manages agent conversation history and context.""" |
|
|
|
def __init__(self, max_messages: int = 50): |
|
self.messages: List[AgentMessage] = [] |
|
self.max_messages = max_messages |
|
self.context: Dict[str, Any] = {} |
|
|
|
def add_message(self, message: AgentMessage): |
|
"""Add a message to memory.""" |
|
self.messages.append(message) |
|
if len(self.messages) > self.max_messages: |
|
self.messages.pop(0) |
|
|
|
def get_recent_messages(self, n: int = 10) -> List[AgentMessage]: |
|
"""Get n most recent messages.""" |
|
return self.messages[-n:] |
|
|
|
def get_messages_by_sender(self, sender: str) -> List[AgentMessage]: |
|
"""Get all messages from a specific sender.""" |
|
return [msg for msg in self.messages if msg.sender == sender] |
|
|
|
def update_context(self, key: str, value: Any): |
|
"""Update context information.""" |
|
self.context[key] = value |
|
|
|
def get_context(self, key: str) -> Any: |
|
"""Get context information.""" |
|
return self.context.get(key) |
|
|
|
class BaseAgent: |
|
"""Base class for all AI agents in the system.""" |
|
|
|
def __init__(self, name: str, role: AgentRole, llm: ChatOpenAI): |
|
self.name = name |
|
self.role = role |
|
self.llm = llm |
|
self.memory = AgentMemory() |
|
self.active = True |
|
self.current_task: Optional[Task] = None |
|
self.completed_tasks: List[Task] = [] |
|
|
|
async def process_task(self, task: Task) -> Task: |
|
"""Process a task and return the result.""" |
|
self.current_task = task |
|
task.status = TaskStatus.IN_PROGRESS |
|
task.assigned_to = self.name |
|
|
|
try: |
|
|
|
result = await self._execute_task(task) |
|
|
|
task.result = result |
|
task.status = TaskStatus.COMPLETED |
|
task.completed_at = datetime.now() |
|
task.confidence = self._calculate_confidence(result) |
|
|
|
self.completed_tasks.append(task) |
|
|
|
except Exception as e: |
|
logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}") |
|
task.status = TaskStatus.FAILED |
|
task.result = f"Error: {str(e)}" |
|
task.confidence = 0.0 |
|
|
|
finally: |
|
self.current_task = None |
|
|
|
return task |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute the task - to be implemented by subclasses.""" |
|
raise NotImplementedError("Subclasses must implement _execute_task") |
|
|
|
def _calculate_confidence(self, result: Any) -> float: |
|
"""Calculate confidence score for the result.""" |
|
|
|
if result and isinstance(result, str) and len(result) > 50: |
|
return min(0.9, 0.5 + len(result) / 1000) |
|
return 0.5 |
|
|
|
async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage): |
|
"""Handle collaboration with another agent.""" |
|
self.memory.add_message(message) |
|
|
|
|
|
response = await self._process_collaboration(message) |
|
|
|
|
|
response_message = AgentMessage( |
|
sender=self.name, |
|
recipient=other_agent.name, |
|
content=response, |
|
message_type="response" |
|
) |
|
|
|
other_agent.memory.add_message(response_message) |
|
|
|
return response_message |
|
|
|
async def _process_collaboration(self, message: AgentMessage) -> str: |
|
"""Process collaboration message - to be implemented by subclasses.""" |
|
return f"Acknowledged message from {message.sender}" |
|
|
|
class ResearcherAgent(BaseAgent): |
|
"""Agent specialized in researching and gathering information.""" |
|
|
|
def __init__(self, name: str, llm: ChatOpenAI): |
|
super().__init__(name, AgentRole.RESEARCHER, llm) |
|
self.research_sources: List[str] = [] |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute research task.""" |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Research Agent specializing in gathering comprehensive information. |
|
Your role is to: |
|
1. Break down complex topics into research questions |
|
2. Identify key information sources and data points |
|
3. Provide detailed, factual information with citations where possible |
|
4. Flag areas requiring further investigation"""), |
|
HumanMessage(content=f"Research the following: {task.description}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
research_result = { |
|
"findings": response.content, |
|
"sources": self._extract_sources(response.content), |
|
"key_points": self._extract_key_points(response.content), |
|
"areas_for_investigation": self._identify_gaps(response.content) |
|
} |
|
|
|
return research_result |
|
|
|
def _extract_sources(self, content: str) -> List[str]: |
|
"""Extract potential sources from research content.""" |
|
|
|
sources = [] |
|
lines = content.split('\n') |
|
for line in lines: |
|
if any(keyword in line.lower() for keyword in ['source:', 'reference:', 'based on:', 'according to']): |
|
sources.append(line.strip()) |
|
return sources[:5] |
|
|
|
def _extract_key_points(self, content: str) -> List[str]: |
|
"""Extract key points from research.""" |
|
key_points = [] |
|
lines = content.split('\n') |
|
for line in lines: |
|
if line.strip() and (line.strip()[0].isdigit() or line.strip().startswith('-')): |
|
key_points.append(line.strip()) |
|
return key_points[:10] |
|
|
|
def _identify_gaps(self, content: str) -> List[str]: |
|
"""Identify areas needing more research.""" |
|
gaps = [] |
|
keywords = ['unclear', 'requires further', 'need more', 'investigate', 'unknown'] |
|
lines = content.split('\n') |
|
for line in lines: |
|
if any(keyword in line.lower() for keyword in keywords): |
|
gaps.append(line.strip()) |
|
return gaps[:5] |
|
|
|
class AnalystAgent(BaseAgent): |
|
"""Agent specialized in analyzing data and identifying patterns.""" |
|
|
|
def __init__(self, name: str, llm: ChatOpenAI): |
|
super().__init__(name, AgentRole.ANALYST, llm) |
|
self.analysis_methods: List[str] = ["statistical", "comparative", "trend", "causal"] |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute analysis task.""" |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are an Analyst Agent specializing in data analysis and pattern recognition. |
|
Your role is to: |
|
1. Analyze information systematically and objectively |
|
2. Identify patterns, trends, and correlations |
|
3. Provide quantitative insights where possible |
|
4. Draw logical conclusions based on evidence"""), |
|
HumanMessage(content=f"Analyze the following: {task.description}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
analysis_result = { |
|
"analysis": response.content, |
|
"patterns": self._identify_patterns(response.content), |
|
"insights": self._extract_insights(response.content), |
|
"recommendations": self._generate_recommendations(response.content), |
|
"confidence_metrics": self._calculate_analysis_confidence(response.content) |
|
} |
|
|
|
return analysis_result |
|
|
|
def _identify_patterns(self, content: str) -> List[Dict[str, str]]: |
|
"""Identify patterns in the analysis.""" |
|
patterns = [] |
|
pattern_keywords = ['pattern', 'trend', 'correlation', 'relationship', 'consistent'] |
|
|
|
lines = content.split('\n') |
|
for line in lines: |
|
if any(keyword in line.lower() for keyword in pattern_keywords): |
|
patterns.append({ |
|
"description": line.strip(), |
|
"type": self._classify_pattern(line) |
|
}) |
|
|
|
return patterns[:5] |
|
|
|
def _classify_pattern(self, description: str) -> str: |
|
"""Classify the type of pattern.""" |
|
description_lower = description.lower() |
|
if 'trend' in description_lower: |
|
return 'trend' |
|
elif 'correlation' in description_lower: |
|
return 'correlation' |
|
elif 'cycle' in description_lower or 'periodic' in description_lower: |
|
return 'cyclical' |
|
else: |
|
return 'general' |
|
|
|
def _extract_insights(self, content: str) -> List[str]: |
|
"""Extract key insights from analysis.""" |
|
insights = [] |
|
insight_keywords = ['shows', 'indicates', 'suggests', 'reveals', 'demonstrates'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in insight_keywords): |
|
insights.append(sentence.strip() + '.') |
|
|
|
return insights[:7] |
|
|
|
def _generate_recommendations(self, content: str) -> List[str]: |
|
"""Generate recommendations based on analysis.""" |
|
recommendations = [] |
|
rec_keywords = ['recommend', 'suggest', 'should', 'consider', 'advise'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in rec_keywords): |
|
recommendations.append(sentence.strip() + '.') |
|
|
|
return recommendations[:5] |
|
|
|
def _calculate_analysis_confidence(self, content: str) -> Dict[str, float]: |
|
"""Calculate confidence metrics for the analysis.""" |
|
word_count = len(content.split()) |
|
evidence_count = content.lower().count('evidence') + content.lower().count('data') + content.lower().count('shows') |
|
uncertainty_count = content.lower().count('may') + content.lower().count('might') + content.lower().count('possibly') |
|
|
|
confidence = min(0.95, 0.5 + (evidence_count * 0.1) - (uncertainty_count * 0.05) + (word_count / 1000)) |
|
|
|
return { |
|
"overall_confidence": confidence, |
|
"evidence_strength": min(1.0, evidence_count / 10), |
|
"certainty_level": max(0.0, 1.0 - (uncertainty_count / 10)) |
|
} |
|
|
|
class CriticAgent(BaseAgent): |
|
"""Agent specialized in critical evaluation and quality assurance.""" |
|
|
|
def __init__(self, name: str, llm: ChatOpenAI): |
|
super().__init__(name, AgentRole.CRITIC, llm) |
|
self.evaluation_criteria = [ |
|
"accuracy", "completeness", "logic", "evidence", |
|
"clarity", "relevance", "consistency" |
|
] |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute critical evaluation task.""" |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Critic Agent specializing in rigorous evaluation and quality assurance. |
|
Your role is to: |
|
1. Critically evaluate arguments and conclusions |
|
2. Identify weaknesses, gaps, and potential biases |
|
3. Verify logical consistency and evidence quality |
|
4. Suggest improvements and alternative perspectives |
|
5. Ensure high standards of analysis"""), |
|
HumanMessage(content=f"Critically evaluate the following: {task.description}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
critique_result = { |
|
"evaluation": response.content, |
|
"strengths": self._identify_strengths(response.content), |
|
"weaknesses": self._identify_weaknesses(response.content), |
|
"gaps": self._identify_gaps(response.content), |
|
"improvements": self._suggest_improvements(response.content), |
|
"quality_score": self._calculate_quality_score(response.content) |
|
} |
|
|
|
return critique_result |
|
|
|
def _identify_strengths(self, content: str) -> List[str]: |
|
"""Identify strengths in the evaluated content.""" |
|
strengths = [] |
|
strength_keywords = ['strong', 'excellent', 'well', 'good', 'effective', 'solid'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in strength_keywords): |
|
strengths.append(sentence.strip() + '.') |
|
|
|
return strengths[:5] |
|
|
|
def _identify_weaknesses(self, content: str) -> List[str]: |
|
"""Identify weaknesses in the evaluated content.""" |
|
weaknesses = [] |
|
weakness_keywords = ['weak', 'lack', 'insufficient', 'poor', 'inadequate', 'missing'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in weakness_keywords): |
|
weaknesses.append(sentence.strip() + '.') |
|
|
|
return weaknesses[:5] |
|
|
|
def _identify_gaps(self, content: str) -> List[str]: |
|
"""Identify gaps in the analysis.""" |
|
gaps = [] |
|
gap_keywords = ['gap', 'missing', 'overlook', 'fail to', 'does not address', 'ignores'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in gap_keywords): |
|
gaps.append(sentence.strip() + '.') |
|
|
|
return gaps[:5] |
|
|
|
def _suggest_improvements(self, content: str) -> List[str]: |
|
"""Suggest improvements based on critique.""" |
|
improvements = [] |
|
improvement_keywords = ['could', 'should', 'improve', 'enhance', 'strengthen', 'add'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in improvement_keywords): |
|
improvements.append(sentence.strip() + '.') |
|
|
|
return improvements[:5] |
|
|
|
def _calculate_quality_score(self, content: str) -> Dict[str, float]: |
|
"""Calculate quality scores for different criteria.""" |
|
scores = {} |
|
|
|
for criterion in self.evaluation_criteria: |
|
|
|
positive_count = content.lower().count(criterion) + content.lower().count('good') + content.lower().count('strong') |
|
negative_count = content.lower().count('poor') + content.lower().count('weak') + content.lower().count('lacking') |
|
|
|
score = min(1.0, max(0.0, 0.5 + (positive_count * 0.1) - (negative_count * 0.15))) |
|
scores[criterion] = score |
|
|
|
scores['overall'] = sum(scores.values()) / len(scores) |
|
|
|
return scores |
|
|
|
class SynthesizerAgent(BaseAgent): |
|
"""Agent specialized in synthesizing information and creating coherent narratives.""" |
|
|
|
def __init__(self, name: str, llm: ChatOpenAI): |
|
super().__init__(name, AgentRole.SYNTHESIZER, llm) |
|
self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate"] |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
"""Execute synthesis task.""" |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Synthesizer Agent specializing in integrating diverse information. |
|
Your role is to: |
|
1. Combine multiple perspectives into coherent narratives |
|
2. Resolve contradictions and find common ground |
|
3. Create comprehensive summaries that capture key insights |
|
4. Generate actionable conclusions and recommendations |
|
5. Ensure clarity and accessibility of complex information"""), |
|
HumanMessage(content=f"Synthesize the following information: {task.description}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
synthesis_result = { |
|
"synthesis": response.content, |
|
"key_themes": self._extract_themes(response.content), |
|
"consensus_points": self._identify_consensus(response.content), |
|
"contradictions": self._identify_contradictions(response.content), |
|
"final_recommendations": self._generate_final_recommendations(response.content), |
|
"executive_summary": self._create_executive_summary(response.content) |
|
} |
|
|
|
return synthesis_result |
|
|
|
def _extract_themes(self, content: str) -> List[Dict[str, str]]: |
|
"""Extract major themes from synthesis.""" |
|
themes = [] |
|
theme_keywords = ['theme', 'pattern', 'trend', 'common', 'recurring', 'central'] |
|
|
|
paragraphs = content.split('\n\n') |
|
for i, paragraph in enumerate(paragraphs[:5]): |
|
if any(keyword in paragraph.lower() for keyword in theme_keywords): |
|
themes.append({ |
|
"theme": f"Theme {i+1}", |
|
"description": paragraph.strip()[:200] + "..." if len(paragraph) > 200 else paragraph.strip() |
|
}) |
|
|
|
return themes |
|
|
|
def _identify_consensus(self, content: str) -> List[str]: |
|
"""Identify points of consensus.""" |
|
consensus = [] |
|
consensus_keywords = ['agree', 'consensus', 'common', 'shared', 'unanimous', 'consistent'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in consensus_keywords): |
|
consensus.append(sentence.strip() + '.') |
|
|
|
return consensus[:5] |
|
|
|
def _identify_contradictions(self, content: str) -> List[str]: |
|
"""Identify contradictions or conflicts.""" |
|
contradictions = [] |
|
conflict_keywords = ['however', 'contrary', 'conflict', 'disagree', 'opposing', 'contradicts'] |
|
|
|
sentences = content.split('.') |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in conflict_keywords): |
|
contradictions.append(sentence.strip() + '.') |
|
|
|
return contradictions[:3] |
|
|
|
def _generate_final_recommendations(self, content: str) -> List[str]: |
|
"""Generate final synthesized recommendations.""" |
|
recommendations = [] |
|
|
|
|
|
lines = content.split('\n') |
|
in_recommendations = False |
|
|
|
for line in lines: |
|
if 'recommend' in line.lower() or 'conclusion' in line.lower(): |
|
in_recommendations = True |
|
elif in_recommendations and line.strip(): |
|
recommendations.append(line.strip()) |
|
if len(recommendations) >= 5: |
|
break |
|
|
|
return recommendations |
|
|
|
def _create_executive_summary(self, content: str) -> str: |
|
"""Create an executive summary of the synthesis.""" |
|
|
|
paragraphs = content.split('\n\n') |
|
if paragraphs: |
|
summary = paragraphs[0][:300] |
|
if len(paragraphs[0]) > 300: |
|
summary += "..." |
|
return summary |
|
return "Summary generation in progress..." |
|
|
|
class CoordinatorAgent(BaseAgent): |
|
"""Agent responsible for coordinating other agents and managing workflow.""" |
|
|
|
def __init__(self, name: str, llm: ChatOpenAI): |
|
super().__init__(name, AgentRole.COORDINATOR, llm) |
|
self.agents: Dict[str, BaseAgent] = {} |
|
self.task_queue: List[Task] = [] |
|
self.completed_tasks: List[Task] = [] |
|
self.workflow_graph = nx.DiGraph() |
|
self.execution_history: List[Dict[str, Any]] = [] |
|
|
|
def register_agent(self, agent: BaseAgent): |
|
"""Register an agent with the coordinator.""" |
|
self.agents[agent.name] = agent |
|
self.workflow_graph.add_node(agent.name, role=agent.role.value) |
|
|
|
async def decompose_problem(self, problem: str) -> List[Task]: |
|
"""Decompose a complex problem into subtasks.""" |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems. |
|
Decompose the problem into specific subtasks that can be assigned to specialized agents: |
|
- Researcher: For gathering information and facts |
|
- Analyst: For analyzing data and identifying patterns |
|
- Critic: For evaluating quality and identifying issues |
|
- Synthesizer: For combining insights and creating summaries |
|
|
|
Create clear, actionable subtasks with dependencies."""), |
|
HumanMessage(content=f"Decompose this problem into subtasks: {problem}") |
|
]) |
|
|
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
|
|
|
|
tasks = self._parse_tasks(response.content, problem) |
|
|
|
return tasks |
|
|
|
def _parse_tasks(self, content: str, original_problem: str) -> List[Task]: |
|
"""Parse LLM response into Task objects.""" |
|
tasks = [] |
|
lines = content.split('\n') |
|
|
|
task_id = 1 |
|
current_role = None |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if not line: |
|
continue |
|
|
|
|
|
for role in AgentRole: |
|
if role.value in line: |
|
current_role = role.value |
|
break |
|
|
|
|
|
if (line[0].isdigit() or line.startswith('-')) and current_role: |
|
|
|
task_desc = line.lstrip('0123456789.-').strip() |
|
|
|
task = Task( |
|
id=f"task_{task_id}", |
|
description=task_desc, |
|
metadata={ |
|
"original_problem": original_problem, |
|
"suggested_role": current_role |
|
} |
|
) |
|
|
|
tasks.append(task) |
|
task_id += 1 |
|
|
|
|
|
if not tasks: |
|
tasks = [ |
|
Task(id="task_1", description=f"Research background information on: {original_problem}", |
|
metadata={"suggested_role": "Researcher"}), |
|
Task(id="task_2", description=f"Analyze key aspects of: {original_problem}", |
|
metadata={"suggested_role": "Analyst"}), |
|
Task(id="task_3", description="Critically evaluate the research and analysis", |
|
metadata={"suggested_role": "Critic"}), |
|
Task(id="task_4", description="Synthesize all findings into actionable insights", |
|
metadata={"suggested_role": "Synthesizer"}) |
|
] |
|
|
|
return tasks |
|
|
|
async def execute_workflow(self, tasks: List[Task], parallel: bool = True) -> Dict[str, Any]: |
|
"""Execute the workflow with given tasks.""" |
|
start_time = datetime.now() |
|
|
|
|
|
self._build_dependency_graph(tasks) |
|
|
|
|
|
if parallel: |
|
results = await self._execute_parallel(tasks) |
|
else: |
|
results = await self._execute_sequential(tasks) |
|
|
|
|
|
end_time = datetime.now() |
|
execution_time = (end_time - start_time).total_seconds() |
|
|
|
workflow_result = { |
|
"tasks": tasks, |
|
"execution_time": execution_time, |
|
"success_rate": self._calculate_success_rate(tasks), |
|
"agent_contributions": self._compile_agent_contributions(tasks), |
|
"workflow_graph": self.workflow_graph, |
|
"timestamp": datetime.now() |
|
} |
|
|
|
self.execution_history.append(workflow_result) |
|
|
|
return workflow_result |
|
|
|
def _build_dependency_graph(self, tasks: List[Task]): |
|
"""Build a dependency graph for tasks.""" |
|
|
|
role_order = ["Researcher", "Analyst", "Critic", "Synthesizer"] |
|
|
|
|
|
tasks_by_role = {} |
|
for task in tasks: |
|
role = task.metadata.get("suggested_role", "Researcher") |
|
if role not in tasks_by_role: |
|
tasks_by_role[role] = [] |
|
tasks_by_role[role].append(task) |
|
|
|
|
|
for i in range(len(role_order) - 1): |
|
current_role = role_order[i] |
|
next_role = role_order[i + 1] |
|
|
|
if current_role in tasks_by_role and next_role in tasks_by_role: |
|
for current_task in tasks_by_role[current_role]: |
|
for next_task in tasks_by_role[next_role]: |
|
next_task.dependencies.append(current_task.id) |
|
|
|
async def _execute_parallel(self, tasks: List[Task]) -> List[Task]: |
|
"""Execute tasks in parallel where possible.""" |
|
completed = set() |
|
pending = tasks.copy() |
|
|
|
while pending: |
|
|
|
ready_tasks = [ |
|
task for task in pending |
|
if all(dep in completed for dep in task.dependencies) |
|
] |
|
|
|
if not ready_tasks: |
|
|
|
ready_tasks = [pending[0]] |
|
|
|
|
|
task_futures = [] |
|
for task in ready_tasks: |
|
agent_name = self._select_agent_for_task(task) |
|
if agent_name and agent_name in self.agents: |
|
agent = self.agents[agent_name] |
|
task_futures.append(agent.process_task(task)) |
|
|
|
|
|
self.workflow_graph.add_edge( |
|
self.name, agent_name, |
|
task_id=task.id, |
|
timestamp=datetime.now() |
|
) |
|
|
|
|
|
if task_futures: |
|
completed_tasks = await asyncio.gather(*task_futures) |
|
|
|
for task in completed_tasks: |
|
completed.add(task.id) |
|
pending.remove(task) |
|
self.completed_tasks.append(task) |
|
|
|
return tasks |
|
|
|
async def _execute_sequential(self, tasks: List[Task]) -> List[Task]: |
|
"""Execute tasks sequentially.""" |
|
for task in tasks: |
|
agent_name = self._select_agent_for_task(task) |
|
if agent_name and agent_name in self.agents: |
|
agent = self.agents[agent_name] |
|
await agent.process_task(task) |
|
|
|
|
|
self.workflow_graph.add_edge( |
|
self.name, agent_name, |
|
task_id=task.id, |
|
timestamp=datetime.now() |
|
) |
|
|
|
self.completed_tasks.append(task) |
|
|
|
return tasks |
|
|
|
def _select_agent_for_task(self, task: Task) -> Optional[str]: |
|
"""Select the best agent for a given task.""" |
|
suggested_role = task.metadata.get("suggested_role") |
|
|
|
|
|
for agent_name, agent in self.agents.items(): |
|
if agent.role.value == suggested_role: |
|
return agent_name |
|
|
|
|
|
return list(self.agents.keys())[0] if self.agents else None |
|
|
|
def _calculate_success_rate(self, tasks: List[Task]) -> float: |
|
"""Calculate the success rate of task execution.""" |
|
if not tasks: |
|
return 0.0 |
|
|
|
successful = sum(1 for task in tasks if task.status == TaskStatus.COMPLETED) |
|
return successful / len(tasks) |
|
|
|
def _compile_agent_contributions(self, tasks: List[Task]) -> Dict[str, Any]: |
|
"""Compile contributions from each agent.""" |
|
contributions = {} |
|
|
|
for agent_name, agent in self.agents.items(): |
|
agent_tasks = [task for task in tasks if task.assigned_to == agent_name] |
|
|
|
contributions[agent_name] = { |
|
"role": agent.role.value, |
|
"tasks_completed": len(agent_tasks), |
|
"average_confidence": np.mean([task.confidence for task in agent_tasks]) if agent_tasks else 0.0, |
|
"total_execution_time": sum( |
|
(task.completed_at - task.created_at).total_seconds() |
|
for task in agent_tasks |
|
if task.completed_at |
|
) |
|
} |
|
|
|
return contributions |
|
|
|
class WorkflowVisualizer: |
|
"""Handles visualization of agent interactions and workflow.""" |
|
|
|
def __init__(self): |
|
self.color_map = Config.NODE_COLORS |
|
self.layout_cache = {} |
|
|
|
def create_workflow_graph(self, workflow_graph: nx.DiGraph, |
|
active_agents: List[str] = None) -> go.Figure: |
|
"""Create an interactive workflow visualization.""" |
|
|
|
|
|
pos = self._hierarchical_layout(workflow_graph) |
|
|
|
|
|
edge_trace = self._create_edge_trace(workflow_graph, pos) |
|
|
|
|
|
node_trace = self._create_node_trace(workflow_graph, pos, active_agents) |
|
|
|
|
|
fig = go.Figure(data=[edge_trace, node_trace], |
|
layout=go.Layout( |
|
title='Agent Collaboration Network', |
|
titlefont_size=16, |
|
showlegend=False, |
|
hovermode='closest', |
|
margin=dict(b=20, l=5, r=5, t=40), |
|
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
plot_bgcolor='white' |
|
)) |
|
|
|
return fig |
|
|
|
def _hierarchical_layout(self, G: nx.DiGraph) -> Dict: |
|
"""Create hierarchical layout for the graph.""" |
|
if len(G) == 0: |
|
return {} |
|
|
|
|
|
graph_hash = hash(tuple(G.edges())) |
|
if graph_hash in self.layout_cache: |
|
return self.layout_cache[graph_hash] |
|
|
|
|
|
hierarchy = { |
|
'Coordinator': 0, |
|
'Researcher': 1, |
|
'Analyst': 2, |
|
'Critic': 3, |
|
'Synthesizer': 4 |
|
} |
|
|
|
|
|
pos = {} |
|
role_counts = {} |
|
|
|
for node in G.nodes(): |
|
role = G.nodes[node].get('role', 'Unknown') |
|
level = hierarchy.get(role, 5) |
|
|
|
if level not in role_counts: |
|
role_counts[level] = 0 |
|
|
|
x = role_counts[level] * 2 - (len([n for n in G.nodes() if hierarchy.get(G.nodes[n].get('role'), 5) == level]) - 1) |
|
y = -level * 2 |
|
|
|
pos[node] = (x, y) |
|
role_counts[level] += 1 |
|
|
|
|
|
self.layout_cache[graph_hash] = pos |
|
|
|
return pos |
|
|
|
def _create_edge_trace(self, G: nx.DiGraph, pos: Dict) -> go.Scatter: |
|
"""Create edge trace for the graph.""" |
|
edge_x = [] |
|
edge_y = [] |
|
|
|
for edge in G.edges(): |
|
x0, y0 = pos.get(edge[0], (0, 0)) |
|
x1, y1 = pos.get(edge[1], (0, 0)) |
|
edge_x.extend([x0, x1, None]) |
|
edge_y.extend([y0, y1, None]) |
|
|
|
edge_trace = go.Scatter( |
|
x=edge_x, y=edge_y, |
|
line=dict(width=2, color='#888'), |
|
hoverinfo='none', |
|
mode='lines' |
|
) |
|
|
|
return edge_trace |
|
|
|
def _create_node_trace(self, G: nx.DiGraph, pos: Dict, |
|
active_agents: List[str] = None) -> go.Scatter: |
|
"""Create node trace for the graph.""" |
|
node_x = [] |
|
node_y = [] |
|
node_colors = [] |
|
node_sizes = [] |
|
node_text = [] |
|
|
|
for node in G.nodes(): |
|
x, y = pos.get(node, (0, 0)) |
|
node_x.append(x) |
|
node_y.append(y) |
|
|
|
|
|
role = G.nodes[node].get('role', 'Unknown') |
|
color = self.color_map.get(role, '#666') |
|
|
|
|
|
if active_agents and node in active_agents: |
|
size = 30 |
|
color = self._brighten_color(color) |
|
else: |
|
size = 20 |
|
|
|
node_colors.append(color) |
|
node_sizes.append(size) |
|
|
|
|
|
degree = G.degree(node) |
|
hover_text = f"{node}<br>Role: {role}<br>Connections: {degree}" |
|
node_text.append(hover_text) |
|
|
|
node_trace = go.Scatter( |
|
x=node_x, y=node_y, |
|
mode='markers+text', |
|
hoverinfo='text', |
|
text=[node for node in G.nodes()], |
|
textposition="top center", |
|
hovertext=node_text, |
|
marker=dict( |
|
showscale=False, |
|
color=node_colors, |
|
size=node_sizes, |
|
line=dict(color='white', width=2) |
|
) |
|
) |
|
|
|
return node_trace |
|
|
|
def _brighten_color(self, color: str) -> str: |
|
"""Make a color brighter for highlighting.""" |
|
|
|
if color.startswith('#'): |
|
r = int(color[1:3], 16) |
|
g = int(color[3:5], 16) |
|
b = int(color[5:7], 16) |
|
|
|
|
|
r = int(r + (255 - r) * 0.3) |
|
g = int(g + (255 - g) * 0.3) |
|
b = int(b + (255 - b) * 0.3) |
|
|
|
return f"#{r:02x}{g:02x}{b:02x}" |
|
|
|
return color |
|
|
|
def create_task_timeline(self, tasks: List[Task]) -> go.Figure: |
|
"""Create a timeline visualization of task execution.""" |
|
|
|
|
|
timeline_data = [] |
|
|
|
for task in tasks: |
|
if task.created_at and task.completed_at: |
|
timeline_data.append({ |
|
'Task': task.id, |
|
'Agent': task.assigned_to or 'Unassigned', |
|
'Start': task.created_at, |
|
'Finish': task.completed_at, |
|
'Status': task.status.value, |
|
'Confidence': task.confidence |
|
}) |
|
|
|
if not timeline_data: |
|
|
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No completed tasks to display", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, |
|
showarrow=False |
|
) |
|
return fig |
|
|
|
df = pd.DataFrame(timeline_data) |
|
|
|
|
|
fig = px.timeline( |
|
df, |
|
x_start="Start", |
|
x_end="Finish", |
|
y="Agent", |
|
color="Confidence", |
|
hover_data=["Task", "Status", "Confidence"], |
|
title="Task Execution Timeline", |
|
color_continuous_scale="Viridis" |
|
) |
|
|
|
fig.update_yaxis(categoryorder="total ascending") |
|
fig.update_layout(height=400) |
|
|
|
return fig |
|
|
|
def create_confidence_heatmap(self, agent_contributions: Dict[str, Any]) -> go.Figure: |
|
"""Create a heatmap showing agent confidence levels.""" |
|
|
|
agents = list(agent_contributions.keys()) |
|
metrics = ['tasks_completed', 'average_confidence', 'total_execution_time'] |
|
|
|
|
|
data = [] |
|
for metric in metrics: |
|
row = [] |
|
for agent in agents: |
|
value = agent_contributions[agent].get(metric, 0) |
|
if metric == 'total_execution_time': |
|
value = value / 60 |
|
row.append(value) |
|
data.append(row) |
|
|
|
|
|
fig = go.Figure(data=go.Heatmap( |
|
z=data, |
|
x=agents, |
|
y=['Tasks Completed', 'Avg Confidence', 'Time (min)'], |
|
colorscale='Blues', |
|
text=np.round(data, 2), |
|
texttemplate='%{text}', |
|
textfont={"size": 10} |
|
)) |
|
|
|
fig.update_layout( |
|
title="Agent Performance Metrics", |
|
xaxis_title="Agents", |
|
yaxis_title="Metrics", |
|
height=300 |
|
) |
|
|
|
return fig |
|
|
|
class ReportGenerator: |
|
"""Generates comprehensive reports from multi-agent collaboration.""" |
|
|
|
def __init__(self): |
|
self.section_generators = { |
|
'executive_summary': self._generate_executive_summary, |
|
'task_analysis': self._generate_task_analysis, |
|
'agent_contributions': self._generate_agent_contributions, |
|
'key_findings': self._generate_key_findings, |
|
'recommendations': self._generate_recommendations, |
|
'confidence_analysis': self._generate_confidence_analysis |
|
} |
|
|
|
def generate_report(self, |
|
workflow_result: Dict[str, Any], |
|
problem_statement: str, |
|
include_sections: List[str] = None) -> str: |
|
"""Generate a comprehensive report from workflow results.""" |
|
|
|
if include_sections is None: |
|
include_sections = list(self.section_generators.keys()) |
|
|
|
report_sections = [] |
|
|
|
|
|
report_sections.append(self._generate_header(problem_statement)) |
|
|
|
|
|
for section in include_sections: |
|
if section in self.section_generators: |
|
section_content = self.section_generators[section](workflow_result, problem_statement) |
|
report_sections.append(section_content) |
|
|
|
|
|
report_sections.append(self._generate_footer(workflow_result)) |
|
|
|
return "\n\n".join(report_sections) |
|
|
|
def _generate_header(self, problem_statement: str) -> str: |
|
"""Generate report header.""" |
|
return f"""# Multi-Agent Analysis Report |
|
|
|
**Generated:** {datetime.now().strftime('%B %d, %Y at %I:%M %p')} |
|
|
|
**Problem Statement:** {problem_statement} |
|
|
|
---""" |
|
|
|
def _generate_executive_summary(self, workflow_result: Dict[str, Any], |
|
problem_statement: str) -> str: |
|
"""Generate executive summary section.""" |
|
tasks = workflow_result.get('tasks', []) |
|
success_rate = workflow_result.get('success_rate', 0) |
|
execution_time = workflow_result.get('execution_time', 0) |
|
|
|
|
|
synthesis_task = None |
|
for task in tasks: |
|
if task.assigned_to and 'Synthesizer' in task.assigned_to: |
|
synthesis_task = task |
|
break |
|
|
|
summary = f"""## Executive Summary |
|
|
|
The multi-agent system successfully analyzed the problem with a **{success_rate:.0%} task completion rate** in **{execution_time:.1f} seconds**. |
|
|
|
""" |
|
|
|
if synthesis_task and synthesis_task.result: |
|
if isinstance(synthesis_task.result, dict) and 'executive_summary' in synthesis_task.result: |
|
summary += synthesis_task.result['executive_summary'] |
|
else: |
|
summary += "The analysis revealed key insights across multiple dimensions of the problem." |
|
|
|
return summary |
|
|
|
def _generate_task_analysis(self, workflow_result: Dict[str, Any], |
|
problem_statement: str) -> str: |
|
"""Generate task analysis section.""" |
|
tasks = workflow_result.get('tasks', []) |
|
|
|
content = "## Task Analysis\n\n" |
|
|
|
|
|
completed_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED] |
|
failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED] |
|
|
|
content += f"**Total Tasks:** {len(tasks)}\n" |
|
content += f"**Completed:** {len(completed_tasks)}\n" |
|
content += f"**Failed:** {len(failed_tasks)}\n\n" |
|
|
|
|
|
content += "### Tasks by Agent\n\n" |
|
|
|
agents_tasks = {} |
|
for task in tasks: |
|
agent = task.assigned_to or "Unassigned" |
|
if agent not in agents_tasks: |
|
agents_tasks[agent] = [] |
|
agents_tasks[agent].append(task) |
|
|
|
for agent, agent_tasks in agents_tasks.items(): |
|
content += f"**{agent}:**\n" |
|
for task in agent_tasks: |
|
status_emoji = "✓" if task.status == TaskStatus.COMPLETED else "✗" |
|
confidence = f"({task.confidence:.0%} confidence)" if task.confidence > 0 else "" |
|
content += f"- {status_emoji} {task.description} {confidence}\n" |
|
content += "\n" |
|
|
|
return content |
|
|
|
def _generate_agent_contributions(self, workflow_result: Dict[str, Any], |
|
problem_statement: str) -> str: |
|
"""Generate agent contributions section.""" |
|
contributions = workflow_result.get('agent_contributions', {}) |
|
|
|
content = "## Agent Contributions\n\n" |
|
|
|
for agent, stats in contributions.items(): |
|
role = stats.get('role', 'Unknown') |
|
tasks_completed = stats.get('tasks_completed', 0) |
|
avg_confidence = stats.get('average_confidence', 0) |
|
exec_time = stats.get('total_execution_time', 0) |
|
|
|
content += f"### {agent} ({role})\n" |
|
content += f"- Tasks Completed: {tasks_completed}\n" |
|
content += f"- Average Confidence: {avg_confidence:.0%}\n" |
|
content += f"- Total Execution Time: {exec_time:.1f}s\n\n" |
|
|
|
return content |
|
|
|
def _generate_key_findings(self, workflow_result: Dict[str, Any], |
|
problem_statement: str) -> str: |
|
"""Generate key findings section.""" |
|
tasks = workflow_result.get('tasks', []) |
|
|
|
content = "## Key Findings\n\n" |
|
|
|
|
|
for task in tasks: |
|
if task.status == TaskStatus.COMPLETED and task.result: |
|
agent_role = task.metadata.get('suggested_role', '') |
|
|
|
if 'Researcher' in agent_role and isinstance(task.result, dict): |
|
if 'key_points' in task.result: |
|
content += "### Research Findings\n" |
|
for point in task.result['key_points'][:5]: |
|
content += f"- {point}\n" |
|
content += "\n" |
|
|
|
elif 'Analyst' in agent_role and isinstance(task.result, dict): |
|
if 'insights' in task.result: |
|
content += "### Analytical Insights\n" |
|
for insight in task.result['insights'][:5]: |
|
content += f"- {insight}\n" |
|
content += "\n" |
|
|
|
elif 'Critic' in agent_role and isinstance(task.result, dict): |
|
if 'strengths' in task.result: |
|
content += "### Identified Strengths\n" |
|
for strength in task.result['strengths'][:3]: |
|
content += f"- {strength}\n" |
|
content += "\n" |
|
|
|
if 'gaps' in task.result: |
|
content += "### Identified Gaps\n" |
|
for gap in task.result['gaps'][:3]: |
|
content += f"- {gap}\n" |
|
content += "\n" |
|
|
|
return content |
|
|
|
def _generate_recommendations(self, workflow_result: Dict[str, Any], |
|
problem_statement: str) -> str: |
|
"""Generate recommendations section.""" |
|
tasks = workflow_result.get('tasks', []) |
|
|
|
content = "## Recommendations\n\n" |
|
|
|
all_recommendations = [] |
|
|
|
|
|
for task in tasks: |
|
if task.status == TaskStatus.COMPLETED and task.result: |
|
if isinstance(task.result, dict): |
|
if 'recommendations' in task.result: |
|
all_recommendations.extend(task.result['recommendations']) |
|
elif 'final_recommendations' in task.result: |
|
all_recommendations.extend(task.result['final_recommendations']) |
|
elif 'improvements' in task.result: |
|
all_recommendations.extend(task.result['improvements']) |
|
|
|
|
|
unique_recommendations = [] |
|
seen = set() |
|
for rec in all_recommendations: |
|
rec_lower = rec.lower() |
|
if rec_lower not in seen: |
|
seen.add(rec_lower) |
|
unique_recommendations.append(rec) |
|
|
|
|
|
high_priority = [] |
|
medium_priority = [] |
|
low_priority = [] |
|
|
|
for rec in unique_recommendations[:10]: |
|
|
|
if any(word in rec.lower() for word in ['critical', 'must', 'essential', 'immediately']): |
|
high_priority.append(rec) |
|
elif any(word in rec.lower() for word in ['should', 'recommend', 'important']): |
|
medium_priority.append(rec) |
|
else: |
|
low_priority.append(rec) |
|
|
|
if high_priority: |
|
content += "### High Priority\n" |
|
for rec in high_priority: |
|
content += f"- {rec}\n" |
|
content += "\n" |
|
|
|
if medium_priority: |
|
content += "### Medium Priority\n" |
|
for rec in medium_priority: |
|
content += f"- {rec}\n" |
|
content += "\n" |
|
|
|
if low_priority: |
|
content += "### Low Priority\n" |
|
for rec in low_priority: |
|
content += f"- {rec}\n" |
|
content += "\n" |
|
|
|
return content |
|
|
|
def _generate_confidence_analysis(self, workflow_result: Dict[str, Any], |
|
problem_statement: str) -> str: |
|
"""Generate confidence analysis section.""" |
|
tasks = workflow_result.get('tasks', []) |
|
contributions = workflow_result.get('agent_contributions', {}) |
|
|
|
content = "## Confidence Analysis\n\n" |
|
|
|
|
|
overall_confidence = np.mean([t.confidence for t in tasks if t.confidence > 0]) |
|
content += f"**Overall Confidence Score:** {overall_confidence:.0%}\n\n" |
|
|
|
|
|
content += "### Confidence by Agent Role\n" |
|
|
|
for agent, stats in contributions.items(): |
|
avg_conf = stats.get('average_confidence', 0) |
|
role = stats.get('role', 'Unknown') |
|
content += f"- **{role}**: {avg_conf:.0%}\n" |
|
|
|
content += "\n### Confidence Distribution\n" |
|
|
|
|
|
high_conf = [t for t in tasks if t.confidence >= 0.8] |
|
medium_conf = [t for t in tasks if 0.5 <= t.confidence < 0.8] |
|
low_conf = [t for t in tasks if t.confidence < 0.5] |
|
|
|
content += f"- High Confidence (≥80%): {len(high_conf)} tasks\n" |
|
content += f"- Medium Confidence (50-79%): {len(medium_conf)} tasks\n" |
|
content += f"- Low Confidence (<50%): {len(low_conf)} tasks\n" |
|
|
|
return content |
|
|
|
def _generate_footer(self, workflow_result: Dict[str, Any]) -> str: |
|
"""Generate report footer.""" |
|
execution_time = workflow_result.get('execution_time', 0) |
|
timestamp = workflow_result.get('timestamp', datetime.now()) |
|
|
|
return f"""--- |
|
|
|
**Report Generation Details:** |
|
- Analysis completed in {execution_time:.1f} seconds |
|
- Report generated at {timestamp.strftime('%B %d, %Y at %I:%M %p')} |
|
- Powered by Multi-Agent AI Collaboration System""" |
|
|
|
|
|
def create_gradio_interface(): |
|
"""Create the main Gradio interface for the multi-agent system.""" |
|
|
|
|
|
coordinator = None |
|
visualizer = WorkflowVisualizer() |
|
report_generator = ReportGenerator() |
|
|
|
|
|
current_workflow = None |
|
current_problem = "" |
|
|
|
def initialize_agents(api_key: str, model: str = "gpt-4") -> str: |
|
"""Initialize the multi-agent system.""" |
|
nonlocal coordinator |
|
|
|
if not api_key: |
|
return "Please provide an OpenAI API key to initialize the agents." |
|
|
|
try: |
|
|
|
llm = ChatOpenAI( |
|
api_key=api_key, |
|
model=model, |
|
temperature=Config.TEMPERATURE, |
|
max_tokens=Config.MAX_TOKENS |
|
) |
|
|
|
|
|
coordinator = CoordinatorAgent("Coordinator", llm) |
|
|
|
|
|
researcher = ResearcherAgent("Researcher-1", llm) |
|
analyst = AnalystAgent("Analyst-1", llm) |
|
critic = CriticAgent("Critic-1", llm) |
|
synthesizer = SynthesizerAgent("Synthesizer-1", llm) |
|
|
|
|
|
coordinator.register_agent(researcher) |
|
coordinator.register_agent(analyst) |
|
coordinator.register_agent(critic) |
|
coordinator.register_agent(synthesizer) |
|
|
|
return f"Successfully initialized multi-agent system with {len(coordinator.agents)} agents." |
|
|
|
except Exception as e: |
|
logger.error(f"Error initializing agents: {str(e)}") |
|
return f"Error initializing agents: {str(e)}" |
|
|
|
async def analyze_problem(problem: str, execution_mode: str) -> Tuple[str, Any, Any, Any]: |
|
"""Analyze a problem using the multi-agent system.""" |
|
nonlocal current_workflow, current_problem |
|
|
|
if not coordinator: |
|
return "Please initialize the agents first.", None, None, None |
|
|
|
if not problem: |
|
return "Please enter a problem to analyze.", None, None, None |
|
|
|
current_problem = problem |
|
|
|
try: |
|
|
|
status = "Decomposing problem into subtasks..." |
|
tasks = await coordinator.decompose_problem(problem) |
|
|
|
if not tasks: |
|
return "Failed to decompose problem into tasks.", None, None, None |
|
|
|
|
|
status = f"Executing {len(tasks)} tasks using {execution_mode} mode..." |
|
parallel = execution_mode == "Parallel" |
|
|
|
current_workflow = await coordinator.execute_workflow(tasks, parallel=parallel) |
|
|
|
|
|
workflow_graph = visualizer.create_workflow_graph( |
|
current_workflow['workflow_graph'], |
|
active_agents=list(coordinator.agents.keys()) |
|
) |
|
|
|
timeline_chart = visualizer.create_task_timeline(tasks) |
|
|
|
confidence_heatmap = visualizer.create_confidence_heatmap( |
|
current_workflow['agent_contributions'] |
|
) |
|
|
|
|
|
success_rate = current_workflow['success_rate'] |
|
execution_time = current_workflow['execution_time'] |
|
|
|
status = f"""Analysis completed successfully! |
|
|
|
- Tasks executed: {len(tasks)} |
|
- Success rate: {success_rate:.0%} |
|
- Execution time: {execution_time:.1f} seconds |
|
- Agents involved: {len(coordinator.agents)}""" |
|
|
|
return status, workflow_graph, timeline_chart, confidence_heatmap |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing problem: {str(e)}") |
|
return f"Error during analysis: {str(e)}", None, None, None |
|
|
|
def generate_report(selected_sections: List[str]) -> str: |
|
"""Generate a report from the current workflow results.""" |
|
|
|
if not current_workflow: |
|
return "No analysis results available. Please run an analysis first." |
|
|
|
try: |
|
report = report_generator.generate_report( |
|
current_workflow, |
|
current_problem, |
|
include_sections=selected_sections |
|
) |
|
|
|
return report |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating report: {str(e)}") |
|
return f"Error generating report: {str(e)}" |
|
|
|
def get_agent_details(agent_name: str) -> str: |
|
"""Get detailed information about a specific agent.""" |
|
|
|
if not coordinator or agent_name not in coordinator.agents: |
|
return "Agent not found or system not initialized." |
|
|
|
agent = coordinator.agents[agent_name] |
|
|
|
details = f"""## Agent: {agent.name} |
|
|
|
**Role:** {agent.role.value} |
|
|
|
**Status:** {'Active' if agent.active else 'Inactive'} |
|
|
|
**Completed Tasks:** {len(agent.completed_tasks)} |
|
|
|
**Current Task:** {agent.current_task.description if agent.current_task else 'None'} |
|
|
|
### Recent Tasks: |
|
""" |
|
|
|
for task in agent.completed_tasks[-5:]: |
|
details += f"\n- **{task.id}**: {task.description}" |
|
details += f"\n - Status: {task.status.value}" |
|
details += f"\n - Confidence: {task.confidence:.0%}" |
|
details += f"\n - Execution Time: {(task.completed_at - task.created_at).total_seconds():.1f}s\n" |
|
|
|
return details |
|
|
|
|
|
custom_css = """ |
|
.gradio-container { |
|
font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', sans-serif; |
|
} |
|
|
|
.gr-button-primary { |
|
background-color: #2563eb !important; |
|
border-color: #2563eb !important; |
|
} |
|
|
|
.gr-button-primary:hover { |
|
background-color: #1d4ed8 !important; |
|
border-color: #1d4ed8 !important; |
|
} |
|
|
|
.container { |
|
max-width: 1200px; |
|
margin: 0 auto; |
|
} |
|
|
|
h1 { |
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
-webkit-background-clip: text; |
|
-webkit-text-fill-color: transparent; |
|
text-align: center; |
|
font-size: 2.5rem; |
|
font-weight: 800; |
|
margin-bottom: 1rem; |
|
} |
|
|
|
h2 { |
|
color: #1f2937; |
|
font-size: 1.5rem; |
|
font-weight: 600; |
|
margin-top: 2rem; |
|
margin-bottom: 1rem; |
|
} |
|
|
|
.status-box { |
|
background-color: #f3f4f6; |
|
border-radius: 8px; |
|
padding: 1rem; |
|
margin: 1rem 0; |
|
border-left: 4px solid #3b82f6; |
|
} |
|
""" |
|
|
|
|
|
with gr.Blocks(title="Multi-Agent AI Collaboration System", |
|
theme=gr.themes.Base(), |
|
css=custom_css) as interface: |
|
|
|
gr.Markdown(""" |
|
# Multi-Agent AI Collaboration System |
|
|
|
Advanced AI system with specialized agents working together to solve complex problems through intelligent task decomposition and parallel processing. |
|
""") |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
api_key_input = gr.Textbox( |
|
label="OpenAI API Key", |
|
placeholder="sk-...", |
|
type="password" |
|
) |
|
with gr.Column(scale=1): |
|
model_select = gr.Dropdown( |
|
choices=["gpt-4", "gpt-3.5-turbo"], |
|
value="gpt-4", |
|
label="Model" |
|
) |
|
with gr.Column(scale=1): |
|
init_button = gr.Button("Initialize Agents", variant="primary") |
|
|
|
init_status = gr.Textbox(label="Initialization Status", interactive=False) |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
with gr.TabItem("Problem Analysis"): |
|
gr.Markdown("### Enter a complex problem for multi-agent analysis") |
|
|
|
problem_input = gr.Textbox( |
|
label="Problem Statement", |
|
placeholder="Example: Analyze the potential impact of AI on healthcare delivery in the next 5 years", |
|
lines=3 |
|
) |
|
|
|
with gr.Row(): |
|
execution_mode = gr.Radio( |
|
choices=["Sequential", "Parallel"], |
|
value="Parallel", |
|
label="Execution Mode" |
|
) |
|
analyze_button = gr.Button("Analyze Problem", variant="primary") |
|
|
|
analysis_status = gr.Textbox( |
|
label="Analysis Status", |
|
interactive=False, |
|
lines=5 |
|
) |
|
|
|
|
|
with gr.Row(): |
|
workflow_graph = gr.Plot(label="Agent Collaboration Network") |
|
|
|
with gr.Row(): |
|
timeline_chart = gr.Plot(label="Task Execution Timeline") |
|
confidence_heatmap = gr.Plot(label="Agent Performance Metrics") |
|
|
|
|
|
with gr.TabItem("Agent Details"): |
|
gr.Markdown("### View detailed information about each agent") |
|
|
|
agent_selector = gr.Dropdown( |
|
choices=["Researcher-1", "Analyst-1", "Critic-1", "Synthesizer-1"], |
|
label="Select Agent" |
|
) |
|
|
|
agent_details_button = gr.Button("Get Agent Details") |
|
|
|
agent_details_output = gr.Markdown() |
|
|
|
|
|
with gr.TabItem("Report Generation"): |
|
gr.Markdown("### Generate comprehensive analysis report") |
|
|
|
section_selector = gr.CheckboxGroup( |
|
choices=[ |
|
"executive_summary", |
|
"task_analysis", |
|
"agent_contributions", |
|
"key_findings", |
|
"recommendations", |
|
"confidence_analysis" |
|
], |
|
value=[ |
|
"executive_summary", |
|
"key_findings", |
|
"recommendations" |
|
], |
|
label="Select Report Sections" |
|
) |
|
|
|
generate_report_button = gr.Button("Generate Report", variant="primary") |
|
|
|
report_output = gr.Markdown() |
|
|
|
|
|
with gr.TabItem("Example Problems"): |
|
gr.Markdown(""" |
|
### Example Problems for Analysis |
|
|
|
Click on any example to load it into the analysis tab: |
|
|
|
1. **Business Strategy**: "Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty" |
|
|
|
2. **Technology Assessment**: "Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management" |
|
|
|
3. **Market Analysis**: "Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants" |
|
|
|
4. **Policy Evaluation**: "Assess the implications of remote work policies on organizational culture and productivity" |
|
|
|
5. **Innovation Planning**: "Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools" |
|
""") |
|
|
|
example_buttons = [] |
|
example_problems = [ |
|
"Develop a comprehensive strategy for a traditional retail company to transition to e-commerce while maintaining customer loyalty", |
|
"Evaluate the potential risks and benefits of implementing blockchain technology in supply chain management", |
|
"Analyze the competitive landscape for electric vehicles and identify key success factors for new entrants", |
|
"Assess the implications of remote work policies on organizational culture and productivity", |
|
"Design an innovation framework for a healthcare organization to integrate AI-powered diagnostic tools" |
|
] |
|
|
|
for i, problem in enumerate(example_problems): |
|
btn = gr.Button(f"Load Example {i+1}", size="sm") |
|
example_buttons.append(btn) |
|
|
|
|
|
init_button.click( |
|
fn=initialize_agents, |
|
inputs=[api_key_input, model_select], |
|
outputs=init_status |
|
) |
|
|
|
analyze_button.click( |
|
fn=lambda p, m: asyncio.run(analyze_problem(p, m)), |
|
inputs=[problem_input, execution_mode], |
|
outputs=[analysis_status, workflow_graph, timeline_chart, confidence_heatmap] |
|
) |
|
|
|
agent_details_button.click( |
|
fn=get_agent_details, |
|
inputs=agent_selector, |
|
outputs=agent_details_output |
|
) |
|
|
|
generate_report_button.click( |
|
fn=generate_report, |
|
inputs=section_selector, |
|
outputs=report_output |
|
) |
|
|
|
|
|
for i, btn in enumerate(example_buttons): |
|
btn.click( |
|
fn=lambda idx=i: example_problems[idx], |
|
outputs=problem_input |
|
) |
|
|
|
return interface |
|
|
|
|
|
if __name__ == "__main__": |
|
interface = create_gradio_interface() |
|
interface.launch(share=True) |