|
|
|
|
|
|
|
|
|
import os |
|
import json |
|
import time |
|
import asyncio |
|
import hashlib |
|
import logging |
|
from datetime import datetime |
|
from typing import Dict, List, Tuple, Optional, Any, Union, Set |
|
from dataclasses import dataclass, field |
|
from enum import Enum |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
import networkx as nx |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
|
|
|
|
from langchain_openai import ChatOpenAI |
|
from langchain.schema import HumanMessage, SystemMessage |
|
from langchain.prompts import ChatPromptTemplate |
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
class Config: |
|
""" |
|
Configuration settings for the multi-agent system. |
|
This class centralizes settings for model parameters, agent behavior, |
|
and visualization to allow for easy tuning and management. |
|
""" |
|
|
|
DEFAULT_MODEL = "gpt-4" |
|
TEMPERATURE = 0.5 |
|
MAX_TOKENS = 2048 |
|
|
|
|
|
MAX_ITERATIONS = 10 |
|
COLLABORATION_TIMEOUT = 300 |
|
|
|
|
|
NODE_COLORS = { |
|
'Researcher': '#0077B6', |
|
'Analyst': '#0096C7', |
|
'Critic': '#48CAE4', |
|
'Synthesizer': '#90E0EF', |
|
'Coordinator': '#ADE8F4' |
|
} |
|
|
|
|
|
CONFIDENCE_THRESHOLD = 0.7 |
|
BENCHMARK_BASELINE_TIME = 45.0 |
|
COMPANY_NAME = "Corporate Intelligence Solutions" |
|
|
|
|
|
DEMO_MODE_ENABLED = True |
|
|
|
class AgentRole(Enum): |
|
"""Enumeration of the distinct roles an agent can assume in the system.""" |
|
RESEARCHER = "Researcher" |
|
ANALYST = "Analyst" |
|
CRITIC = "Critic" |
|
SYNTHESIZER = "Synthesizer" |
|
COORDINATOR = "Coordinator" |
|
|
|
class TaskStatus(Enum): |
|
"""Enumeration for the possible statuses of a task during its lifecycle.""" |
|
PENDING = "Pending" |
|
IN_PROGRESS = "In Progress" |
|
COMPLETED = "Completed" |
|
FAILED = "Failed" |
|
|
|
@dataclass |
|
class Task: |
|
""" |
|
Represents a unit of work to be executed by an agent. |
|
|
|
Attributes: |
|
id (str): A unique identifier for the task. |
|
description (str): A detailed description of the work to be done. |
|
assigned_to (Optional[str]): The name of the agent assigned to the task. |
|
status (TaskStatus): The current status of the task. |
|
dependencies (List[str]): A list of task IDs that must be completed before this task can start. |
|
result (Optional[Any]): The output or result of the task execution. |
|
confidence (float): A score from 0.0 to 1.0 indicating the confidence in the task's result. |
|
created_at (datetime): The timestamp when the task was created. |
|
completed_at (Optional[datetime]): The timestamp when the task was completed or failed. |
|
performance_metrics (Dict[str, float]): Metrics related to the task's performance, like execution time. |
|
""" |
|
id: str |
|
description: str |
|
assigned_to: Optional[str] = None |
|
status: TaskStatus = TaskStatus.PENDING |
|
dependencies: List[str] = field(default_factory=list) |
|
result: Optional[Any] = None |
|
confidence: float = 0.0 |
|
created_at: datetime = field(default_factory=datetime.now) |
|
completed_at: Optional[datetime] = None |
|
performance_metrics: Dict[str, float] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseAgent: |
|
""" |
|
An abstract base class for all AI agents in the system. |
|
|
|
This class provides the fundamental structure for agents, including task |
|
processing logic, memory management, and collaboration protocols. Each |
|
specialized agent extends this class to implement role-specific behaviors. |
|
""" |
|
def __init__(self, name: str, role: AgentRole, llm: Optional[ChatOpenAI] = None): |
|
""" |
|
Initializes a BaseAgent instance. |
|
|
|
Args: |
|
name (str): The unique name of the agent. |
|
role (AgentRole): The role of the agent in the system. |
|
llm (Optional[ChatOpenAI]): The language model instance for generating responses. |
|
If None, the agent runs in simulation mode. |
|
""" |
|
self.name = name |
|
self.role = role |
|
self.llm = llm |
|
self.current_task: Optional[Task] = None |
|
self.completed_tasks: List[Task] = [] |
|
|
|
async def process_task(self, task: Task) -> Task: |
|
""" |
|
Processes a given task, updates its status, and records performance. |
|
|
|
This method orchestrates the execution of a task, handling both live (LLM) |
|
and simulated execution paths. It captures metrics and manages task state transitions. |
|
|
|
Args: |
|
task (Task): The task object to be processed. |
|
|
|
Returns: |
|
Task: The processed task object with updated status, result, and metrics. |
|
""" |
|
self.current_task = task |
|
task.status = TaskStatus.IN_PROGRESS |
|
task.assigned_to = self.name |
|
start_time = datetime.now() |
|
|
|
try: |
|
|
|
if self.llm: |
|
result = await self._execute_task(task) |
|
else: |
|
result = await self._simulate_task_execution(task) |
|
|
|
task.result = result |
|
task.status = TaskStatus.COMPLETED |
|
task.completed_at = datetime.now() |
|
task.confidence = self._calculate_confidence(result) |
|
|
|
except Exception as e: |
|
logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}") |
|
task.status = TaskStatus.FAILED |
|
task.result = f"Error: {str(e)}" |
|
task.confidence = 0.0 |
|
|
|
finally: |
|
|
|
if task.status != TaskStatus.IN_PROGRESS: |
|
task.completed_at = task.completed_at or datetime.now() |
|
execution_time = (task.completed_at - start_time).total_seconds() |
|
task.performance_metrics['execution_time'] = execution_time |
|
self.completed_tasks.append(task) |
|
self.current_task = None |
|
|
|
return task |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
""" |
|
Executes a task using the assigned language model. |
|
This method must be implemented by all concrete agent subclasses. |
|
""" |
|
raise NotImplementedError("Subclasses must implement the _execute_task method.") |
|
|
|
async def _simulate_task_execution(self, task: Task) -> Any: |
|
""" |
|
Simulates task execution for demonstration purposes when no LLM is available. |
|
Provides realistic-looking placeholder data based on agent role. |
|
""" |
|
await asyncio.sleep(np.random.uniform(1, 3)) |
|
simulation_templates = { |
|
AgentRole.RESEARCHER: {"findings": f"Comprehensive research on '{task.description}' completed.", "sources": ["Industry Analysis Report", "Academic Study"]}, |
|
AgentRole.ANALYST: {"analysis": f"Detailed analysis of '{task.description}' reveals key trends.", "patterns": ["Identified growth pattern in market segment A."]}, |
|
AgentRole.CRITIC: {"evaluation": f"Critical evaluation of '{task.description}' finds the approach sound but lacking sufficient data validation.", "strengths": ["Logical consistency"], "weaknesses": ["Data sourcing"]}, |
|
AgentRole.SYNTHESIZER: {"synthesis": f"Synthesized findings for '{task.description}' into a coherent strategy.", "recommendations": ["Proceed with strategic initiative X."]} |
|
} |
|
return simulation_templates.get(self.role, {"result": "Simulated task completion."}) |
|
|
|
def _calculate_confidence(self, result: Any) -> float: |
|
""" |
|
Calculates a confidence score for the task result based on its content. |
|
""" |
|
if not result or (isinstance(result, str) and result.startswith("Error:")): |
|
return 0.0 |
|
|
|
base_confidence = 0.6 |
|
if isinstance(result, dict) and len(result.keys()) > 1: |
|
base_confidence += 0.2 |
|
if len(str(result)) > 200: |
|
base_confidence += 0.15 |
|
return min(0.95, base_confidence) |
|
|
|
class ResearcherAgent(BaseAgent): |
|
"""An agent specializing in gathering comprehensive information and data.""" |
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.RESEARCHER, llm) |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="You are a professional researcher. Your goal is to gather unbiased, comprehensive, and well-sourced information on the given topic. Focus on facts, data, and credible sources."), |
|
HumanMessage(content=f"Please conduct detailed research on the following topic: {task.description}") |
|
]) |
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
return {"findings": response.content, "sources": "Extracted from various reliable sources."} |
|
|
|
class AnalystAgent(BaseAgent): |
|
"""An agent specializing in analyzing data to identify patterns and insights.""" |
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.ANALYST, llm) |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="You are an expert analyst. Your role is to dissect information, identify underlying patterns, trends, and correlations, and present actionable insights. Your analysis must be logical and evidence-based."), |
|
HumanMessage(content=f"Please analyze the following information and provide a detailed breakdown: {task.description}") |
|
]) |
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
return {"analysis": response.content, "patterns": "Identified key performance indicators and trends."} |
|
|
|
class CriticAgent(BaseAgent): |
|
"""An agent specializing in evaluating work for quality, biases, and gaps.""" |
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.CRITIC, llm) |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="You are a meticulous critic. Your function is to rigorously evaluate the provided information, identifying logical fallacies, biases, assumptions, and gaps. Provide constructive feedback for improvement."), |
|
HumanMessage(content=f"Please provide a critical evaluation of the following: {task.description}") |
|
]) |
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
return {"evaluation": response.content, "strengths": "Identified robust arguments.", "weaknesses": "Flagged potential biases."} |
|
|
|
class SynthesizerAgent(BaseAgent): |
|
"""An agent specializing in integrating diverse information into a coherent whole.""" |
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.SYNTHESIZER, llm) |
|
|
|
async def _execute_task(self, task: Task) -> Any: |
|
prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessage(content="You are an expert synthesizer. Your task is to integrate disparate pieces of information, resolve contradictions, and formulate a single, coherent, and comprehensive narrative or strategic plan."), |
|
HumanMessage(content=f"Please synthesize the following inputs into a unified conclusion: {task.description}") |
|
]) |
|
response = await self.llm.ainvoke(prompt.format_messages()) |
|
return {"synthesis": response.content, "recommendations": "Formulated final strategic recommendations."} |
|
|
|
|
|
|
|
|
|
|
|
|
|
class CoordinatorAgent(BaseAgent): |
|
""" |
|
The central agent responsible for managing the entire workflow. |
|
|
|
The Coordinator decomposes the main problem, creates and assigns tasks, |
|
builds a dependency graph, and orchestrates the execution of the workflow |
|
by the specialized agents. |
|
""" |
|
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None): |
|
super().__init__(name, AgentRole.COORDINATOR, llm) |
|
self.agents: Dict[str, BaseAgent] = {} |
|
self.workflow_graph = nx.DiGraph() |
|
|
|
def register_agent(self, agent: BaseAgent): |
|
"""Registers a specialized agent with the coordinator.""" |
|
self.agents[agent.name] = agent |
|
self.workflow_graph.add_node(agent.name, role=agent.role.value) |
|
logger.info(f"Registered agent: {agent.name} with role {agent.role.value}") |
|
|
|
def decompose_problem(self, problem: str) -> List[Task]: |
|
""" |
|
Breaks down a complex problem into a sequence of structured tasks. |
|
A predefined template is used for structured and repeatable workflows. |
|
|
|
Args: |
|
problem (str): The high-level problem statement. |
|
|
|
Returns: |
|
List[Task]: A list of Task objects ready for execution. |
|
""" |
|
|
|
tasks = [ |
|
Task(id="task_1", description=f"Conduct foundational research on: {problem}", metadata={"suggested_role": "Researcher"}), |
|
Task(id="task_2", description=f"Analyze the research findings for patterns and insights related to: {problem}", metadata={"suggested_role": "Analyst"}), |
|
Task(id="task_3", description="Critically evaluate the research and analysis for quality, bias, and completeness.", metadata={"suggested_role": "Critic"}), |
|
Task(id="task_4", description="Synthesize all findings into a final report with actionable recommendations.", metadata={"suggested_role": "Synthesizer"}) |
|
] |
|
self._build_dependency_graph(tasks) |
|
return tasks |
|
|
|
def _build_dependency_graph(self, tasks: List[Task]): |
|
"""Constructs dependencies between tasks based on a logical sequence.""" |
|
for i in range(len(tasks) - 1): |
|
tasks[i+1].dependencies.append(tasks[i].id) |
|
|
|
async def execute_workflow(self, tasks: List[Task]) -> Dict[str, Any]: |
|
""" |
|
Executes a list of tasks according to their dependencies. |
|
|
|
This method uses a thread pool to execute tasks in parallel where possible, |
|
respecting the predefined dependency graph. |
|
|
|
Args: |
|
tasks (List[Task]): The list of tasks to execute. |
|
|
|
Returns: |
|
Dict[str, Any]: A dictionary containing the results of the workflow execution. |
|
""" |
|
start_time = datetime.now() |
|
self._update_workflow_graph_with_tasks(tasks) |
|
|
|
completed_task_ids = set() |
|
task_dict = {t.id: t for t in tasks} |
|
|
|
while len(completed_task_ids) < len(tasks): |
|
ready_tasks = [ |
|
t for t in tasks if t.status == TaskStatus.PENDING and all(dep in completed_task_ids for dep in t.dependencies) |
|
] |
|
if not ready_tasks: |
|
|
|
failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED] |
|
if len(completed_task_ids) + len(failed_tasks) == len(tasks): |
|
break |
|
|
|
logger.error("Workflow stalled: circular dependency or unresolved failed dependency.") |
|
break |
|
|
|
with ThreadPoolExecutor(max_workers=len(self.agents)) as executor: |
|
future_to_task = {} |
|
for task in ready_tasks: |
|
agent = self._select_agent_for_task(task) |
|
if agent: |
|
task.status = TaskStatus.IN_PROGRESS |
|
future = executor.submit(asyncio.run, agent.process_task(task)) |
|
future_to_task[future] = task.id |
|
|
|
for future in as_completed(future_to_task): |
|
task_id = future_to_task[future] |
|
try: |
|
completed_task = future.result() |
|
task_dict[task_id] = completed_task |
|
if completed_task.status == TaskStatus.COMPLETED: |
|
completed_task_ids.add(task_id) |
|
|
|
self.workflow_graph.nodes[task_id]['status'] = completed_task.status.value |
|
except Exception as exc: |
|
logger.error(f"Task {task_id} generated an exception: {exc}") |
|
task_dict[task_id].status = TaskStatus.FAILED |
|
self.workflow_graph.nodes[task_id]['status'] = TaskStatus.FAILED.value |
|
|
|
final_tasks = list(task_dict.values()) |
|
return self._compile_workflow_results(final_tasks, start_time) |
|
|
|
def _select_agent_for_task(self, task: Task) -> Optional[BaseAgent]: |
|
"""Selects an available agent best suited for a given task.""" |
|
suggested_role = task.metadata.get("suggested_role") |
|
for agent in self.agents.values(): |
|
if agent.role.value == suggested_role: |
|
return agent |
|
return None |
|
|
|
def _update_workflow_graph_with_tasks(self, tasks: List[Task]): |
|
"""Adds task nodes and edges to the master workflow graph.""" |
|
for task in tasks: |
|
self.workflow_graph.add_node(task.id, task_description=task.description, status=task.status.value) |
|
for dep_id in task.dependencies: |
|
self.workflow_graph.add_edge(dep_id, task.id) |
|
|
|
agent = self._select_agent_for_task(task) |
|
if agent: |
|
self.workflow_graph.add_edge(agent.name, task.id) |
|
|
|
def _compile_workflow_results(self, tasks: List[Task], start_time: datetime) -> Dict[str, Any]: |
|
"""Compiles the final results and metrics of the workflow.""" |
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
successful_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED] |
|
|
|
return { |
|
"tasks": tasks, |
|
"execution_time": execution_time, |
|
"success_rate": len(successful_tasks) / len(tasks) if tasks else 0, |
|
"agent_contributions": { |
|
agent.name: { |
|
"role": agent.role.value, |
|
"tasks_completed": [t.id for t in tasks if t.assigned_to == agent.name], |
|
"average_confidence": np.mean([t.confidence for t in tasks if t.assigned_to == agent.name and t.confidence > 0]) if any(t.assigned_to == agent.name for t in tasks) else 0 |
|
} for agent in self.agents.values() |
|
}, |
|
"workflow_graph": self.workflow_graph |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
class WorkflowVisualizer: |
|
"""Handles the creation of all visualizations for the workflow.""" |
|
|
|
def __init__(self): |
|
"""Initializes the visualizer with a color map.""" |
|
self.color_map = Config.NODE_COLORS |
|
|
|
def create_workflow_graph(self, G: nx.DiGraph) -> go.Figure: |
|
"""Creates an interactive Plotly graph of the agent collaboration network.""" |
|
if not G.nodes(): |
|
return self._create_empty_figure("No workflow data available.") |
|
|
|
|
|
pos = nx.spring_layout(G, k=0.9, iterations=50, seed=42) |
|
|
|
edge_x, edge_y = [], [] |
|
for edge in G.edges(): |
|
x0, y0 = pos[edge[0]] |
|
x1, y1 = pos[edge[1]] |
|
edge_x.extend([x0, x1, None]) |
|
edge_y.extend([y0, y1, None]) |
|
|
|
edge_trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=1, color='#888'), hoverinfo='none', mode='lines') |
|
|
|
node_x, node_y, node_text, node_colors, node_sizes = [], [], [], [], [] |
|
for node in G.nodes(): |
|
x, y = pos[node] |
|
node_x.append(x) |
|
node_y.append(y) |
|
|
|
is_agent = 'role' in G.nodes[node] |
|
if is_agent: |
|
role = G.nodes[node]['role'] |
|
node_text.append(f"<b>{node}</b><br>{role}") |
|
node_colors.append(self.color_map.get(role, '#ccc')) |
|
node_sizes.append(35) |
|
else: |
|
status = G.nodes[node].get('status', 'Pending') |
|
node_text.append(f"<b>{node}</b><br>Status: {status}") |
|
node_colors.append('#6c757d' if status == 'Pending' else '#28a745' if status == 'Completed' else '#dc3545') |
|
node_sizes.append(20) |
|
|
|
node_trace = go.Scatter( |
|
x=node_x, y=node_y, mode='markers', hoverinfo='text', |
|
text=node_text, hovertemplate='%{text}<extra></extra>', |
|
marker=dict(color=node_colors, size=node_sizes, line_width=1, line_color='#fff') |
|
) |
|
|
|
fig = go.Figure(data=[edge_trace, node_trace], layout=self._get_base_layout("Agent Collaboration Network")) |
|
return fig |
|
|
|
def create_task_timeline(self, tasks: List[Task]) -> go.Figure: |
|
"""Creates a Plotly timeline (Gantt chart) of task execution.""" |
|
if not tasks or not any(t.created_at and t.completed_at for t in tasks): |
|
return self._create_empty_figure("No task execution data to display.") |
|
|
|
df_data = [] |
|
for task in tasks: |
|
if task.created_at and task.completed_at: |
|
df_data.append(dict(Task=task.id, Start=task.created_at, Finish=task.completed_at, Agent=task.assigned_to or "Unassigned")) |
|
|
|
if not df_data: |
|
return self._create_empty_figure("No completed tasks with timing data.") |
|
|
|
df = pd.DataFrame(df_data) |
|
fig = px.timeline(df, x_start="Start", x_end="Finish", y="Agent", color="Agent", |
|
color_discrete_map=self.color_map, title="Task Execution Timeline") |
|
fig.update_layout(showlegend=False, plot_bgcolor='white', font_family="sans-serif") |
|
fig.update_xaxes(title="Time") |
|
fig.update_yaxes(title="Agent", categoryorder='total ascending') |
|
return fig |
|
|
|
def create_performance_comparison(self, execution_time: float) -> go.Figure: |
|
"""Creates a bar chart comparing multi-agent vs. single-agent performance.""" |
|
categories = ['Single Agent (Baseline)', 'Multi-Agent System'] |
|
times = [Config.BENCHMARK_BASELINE_TIME, execution_time] |
|
colors = ['#6c757d', '#0077B6'] |
|
|
|
fig = go.Figure(data=[go.Bar(x=categories, y=times, text=[f'{t:.1f}s' for t in times], |
|
textposition='auto', marker_color=colors)]) |
|
fig.update_layout(self._get_base_layout("Performance Comparison"), yaxis_title="Average Completion Time (seconds)") |
|
return fig |
|
|
|
def _get_base_layout(self, title: str) -> go.Layout: |
|
"""Returns a base layout for Plotly figures for a consistent look.""" |
|
return go.Layout( |
|
title={'text': title, 'y':0.9, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top'}, |
|
showlegend=False, |
|
hovermode='closest', |
|
margin=dict(b=20, l=5, r=5, t=40), |
|
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
plot_bgcolor='white', |
|
paper_bgcolor='white', |
|
font_family="sans-serif" |
|
) |
|
|
|
def _create_empty_figure(self, message: str) -> go.Figure: |
|
"""Creates a blank figure with a text message.""" |
|
fig = go.Figure() |
|
fig.add_annotation(text=message, xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=14, color="#888")) |
|
fig.update_layout(xaxis_visible=False, yaxis_visible=False, plot_bgcolor='white') |
|
return fig |
|
|
|
class ReportGenerator: |
|
"""Generates a comprehensive, professional HTML report from workflow results.""" |
|
|
|
def generate_report(self, workflow_result: Dict[str, Any], problem_statement: str) -> str: |
|
""" |
|
Generates a full HTML report from the workflow results. |
|
|
|
Args: |
|
workflow_result (Dict[str, Any]): The compiled results from the Coordinator. |
|
problem_statement (str): The initial problem statement. |
|
|
|
Returns: |
|
str: A formatted HTML string representing the report. |
|
""" |
|
tasks = workflow_result.get('tasks', []) |
|
synthesis_task = next((t for t in reversed(tasks) if t.status == TaskStatus.COMPLETED and t.metadata.get("suggested_role") == "Synthesizer"), None) |
|
|
|
report = f""" |
|
<div style="font-family: sans-serif; color: #333;"> |
|
<div style="background-color: #f8f9fa; padding: 20px; border-radius: 5px; border: 1px solid #dee2e6; margin-bottom: 20px;"> |
|
<h1 style="color: #003366; margin: 0;">Analysis Report</h1> |
|
<p style="margin: 5px 0 0;"><strong>Problem Statement:</strong> {problem_statement}</p> |
|
<p style="margin: 5px 0 0; font-size: 0.9em; color: #6c757d;">Generated on: {datetime.now().strftime('%B %d, %Y at %I:%M %p')}</p> |
|
</div> |
|
{self._generate_summary(workflow_result)} |
|
{self._generate_recommendations(synthesis_task)} |
|
{self._generate_task_breakdown(tasks)} |
|
</div> |
|
""" |
|
return report |
|
|
|
def _generate_summary(self, result: Dict[str, Any]) -> str: |
|
"""Generates the executive summary section of the report.""" |
|
return f""" |
|
<div style="margin-bottom: 20px;"> |
|
<h2 style="border-bottom: 2px solid #0077B6; padding-bottom: 5px; color: #003366;">Executive Summary</h2> |
|
<p>The multi-agent system addressed the problem, achieving a <strong>{result['success_rate']:.0%} success rate</strong> in <strong>{result['execution_time']:.1f} seconds</strong>. The workflow involved {len(result['tasks'])} tasks distributed among specialized agents to ensure comprehensive analysis.</p> |
|
</div> |
|
""" |
|
|
|
def _generate_recommendations(self, synth_task: Optional[Task]) -> str: |
|
"""Generates the key recommendations section from the Synthesizer's output.""" |
|
content = "<p>No synthesized recommendations were produced.</p>" |
|
if synth_task and isinstance(synth_task.result, dict): |
|
recommendations = synth_task.result.get('synthesis', 'No specific recommendations provided in the synthesis.') |
|
content = f"<p>{recommendations}</p>" |
|
|
|
return f""" |
|
<div style="margin-bottom: 20px;"> |
|
<h2 style="border-bottom: 2px solid #0077B6; padding-bottom: 5px; color: #003366;">Key Recommendations</h2> |
|
{content} |
|
</div> |
|
""" |
|
|
|
def _generate_task_breakdown(self, tasks: List[Task]) -> str: |
|
"""Generates a detailed breakdown of each task in the workflow.""" |
|
rows = "" |
|
for task in tasks: |
|
status_color = '#28a745' if task.status == TaskStatus.COMPLETED else '#dc3545' if task.status == TaskStatus.FAILED else '#6c757d' |
|
rows += f""" |
|
<tr> |
|
<td style="padding: 8px; border: 1px solid #dee2e6;">{task.id}</td> |
|
<td style="padding: 8px; border: 1px solid #dee2e6;">{task.assigned_to or 'N/A'}</td> |
|
<td style="padding: 8px; border: 1px solid #dee2e6;">{task.description}</td> |
|
<td style="padding: 8px; border: 1px solid #dee2e6; color: {status_color}; font-weight: bold;">{task.status.value}</td> |
|
<td style="padding: 8px; border: 1px solid #dee2e6;">{task.confidence:.0%}</td> |
|
</tr> |
|
""" |
|
return f""" |
|
<div> |
|
<h2 style="border-bottom: 2px solid #0077B6; padding-bottom: 5px; color: #003366;">Task Execution Details</h2> |
|
<table style="width: 100%; border-collapse: collapse; font-size: 0.9em;"> |
|
<thead style="background-color: #e9ecef;"> |
|
<tr> |
|
<th style="padding: 8px; border: 1px solid #dee2e6; text-align: left;">Task ID</th> |
|
<th style="padding: 8px; border: 1px solid #dee2e6; text-align: left;">Agent</th> |
|
<th style="padding: 8px; border: 1px solid #dee2e6; text-align: left;">Description</th> |
|
<th style="padding: 8px; border: 1px solid #dee2e6; text-align: left;">Status</th> |
|
<th style="padding: 8px; border: 1px solid #dee2e6; text-align: left;">Confidence</th> |
|
</tr> |
|
</thead> |
|
<tbody>{rows}</tbody> |
|
</table> |
|
</div> |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
""" |
|
Creates and configures the main Gradio interface for the system. |
|
This function defines the layout, components, and event handlers for the UI. |
|
""" |
|
visualizer = WorkflowVisualizer() |
|
report_generator = ReportGenerator() |
|
|
|
|
|
state = { |
|
"coordinator": None, |
|
"current_workflow": None, |
|
"current_problem": "" |
|
} |
|
|
|
def initialize_system(api_key: str, model: str, demo_mode: bool) -> str: |
|
"""Initializes the coordinator and specialized agents.""" |
|
llm = None |
|
if not demo_mode: |
|
if not api_key: |
|
return "Error: An OpenAI API key is required for Live Mode." |
|
llm = ChatOpenAI(api_key=api_key, model=model, temperature=Config.TEMPERATURE, max_tokens=Config.MAX_TOKENS) |
|
|
|
state["coordinator"] = CoordinatorAgent("Coordinator-1", llm) |
|
agents_to_register = [ |
|
ResearcherAgent("Researcher-1", llm), |
|
AnalystAgent("Analyst-1", llm), |
|
CriticAgent("Critic-1", llm), |
|
SynthesizerAgent("Synthesizer-1", llm) |
|
] |
|
for agent in agents_to_register: |
|
state["coordinator"].register_agent(agent) |
|
|
|
mode = "Demo Mode" if demo_mode else f"Live Mode ({model})" |
|
return f"System initialized successfully in {mode} with {len(agents_to_register)} agents." |
|
|
|
async def run_analysis(problem: str) -> Tuple[str, go.Figure, go.Figure, go.Figure, str]: |
|
"""Runs the full analysis workflow for a given problem.""" |
|
if not state["coordinator"]: |
|
return "Error: System not initialized. Please initialize first.", None, None, None, "" |
|
if not problem: |
|
return "Error: Problem statement cannot be empty.", None, None, None, "" |
|
|
|
state["current_problem"] = problem |
|
|
|
try: |
|
tasks = state["coordinator"].decompose_problem(problem) |
|
workflow = await state["coordinator"].execute_workflow(tasks) |
|
state["current_workflow"] = workflow |
|
|
|
|
|
status_text = f"Analysis complete. Success Rate: {workflow['success_rate']:.0%}. Total Time: {workflow['execution_time']:.1f}s." |
|
graph_fig = visualizer.create_workflow_graph(workflow['workflow_graph']) |
|
timeline_fig = visualizer.create_task_timeline(workflow['tasks']) |
|
perf_fig = visualizer.create_performance_comparison(workflow['execution_time']) |
|
report_html = report_generator.generate_report(workflow, problem) |
|
|
|
return status_text, graph_fig, timeline_fig, perf_fig, report_html |
|
except Exception as e: |
|
logger.error(f"An error occurred during analysis: {e}") |
|
return f"An unexpected error occurred: {e}", None, None, None, "" |
|
|
|
|
|
custom_css = """ |
|
.gradio-container { max-width: 1400px !important; margin: auto !important; } |
|
h1 { color: #003366; font-family: sans-serif; text-align: center; } |
|
.gr-button { font-weight: 600; font-family: sans-serif; } |
|
.gr-button-primary { background-color: #0077B6 !important; border-color: #0077B6 !important; } |
|
""" |
|
|
|
with gr.Blocks(title="Multi-Agent Analysis Platform", theme=gr.themes.Soft(), css=custom_css) as interface: |
|
gr.Markdown("<h1>Multi-Agent Analysis Platform</h1>") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
|
|
with gr.Accordion("System Configuration", open=True): |
|
api_key_input = gr.Textbox(label="OpenAI API Key", type="password", info="Required for live mode.") |
|
model_select = gr.Dropdown(choices=["gpt-4", "gpt-4-turbo", "gpt-3.5-turbo"], value=Config.DEFAULT_MODEL, label="Language Model") |
|
demo_mode_checkbox = gr.Checkbox(label="Run in Demo Mode", value=Config.DEMO_MODE_ENABLED, info="Uses simulated data, no API key needed.") |
|
init_button = gr.Button("Initialize System", variant="primary") |
|
init_status = gr.Textbox(label="System Status", interactive=False) |
|
|
|
with gr.Column(scale=3): |
|
|
|
with gr.Group(): |
|
problem_input = gr.Textbox(label="Problem Statement", placeholder="Enter a complex problem for the multi-agent system to analyze...", lines=3) |
|
analyze_button = gr.Button("Run Analysis", variant="primary") |
|
analysis_status = gr.Textbox(label="Analysis Status", interactive=False) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Dashboard"): |
|
with gr.Row(): |
|
workflow_graph = gr.Plot(label="Agent Collaboration Network") |
|
with gr.Row(): |
|
timeline_chart = gr.Plot(label="Task Execution Timeline") |
|
performance_chart = gr.Plot(label="Performance Comparison") |
|
|
|
with gr.TabItem("Generated Report"): |
|
report_output = gr.HTML() |
|
|
|
|
|
init_button.click( |
|
fn=initialize_system, |
|
inputs=[api_key_input, model_select, demo_mode_checkbox], |
|
outputs=init_status |
|
) |
|
analyze_button.click( |
|
fn=lambda p: asyncio.run(run_analysis(p)), |
|
inputs=[problem_input], |
|
outputs=[analysis_status, workflow_graph, timeline_chart, performance_chart, report_output] |
|
) |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
app_interface = create_gradio_interface() |
|
app_interface.launch(show_error=True) |