Spaces:
Running
Running
import gradio as gr | |
import requests | |
import json | |
import os | |
import asyncio | |
from datetime import datetime | |
from typing import Dict, List, Any, Optional, Tuple | |
from dotenv import load_dotenv | |
import time | |
import re | |
from collections import Counter | |
import threading | |
import queue | |
import uuid | |
from gradio_consilium_roundtable import consilium_roundtable | |
from research_tools.base_tool import BaseTool | |
from openfloor_helper.OpenFloorResearchAgent import OpenFloorResearchAgent | |
from openfloor_helper.OpenFloorAgentServer import OpenFloorAgentServer | |
from openfloor_helper.OpenFloorManager import OpenFloorManager | |
from openfloor import * | |
from openfloor.envelope import * | |
from enhanced_search_functions import ENHANCED_SEARCH_FUNCTIONS | |
# Load environment variables | |
load_dotenv() | |
# API Configuration - These will be updated by UI if needed | |
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
SAMBANOVA_API_KEY = os.getenv("SAMBANOVA_API_KEY") | |
MODERATOR_MODEL = os.getenv("MODERATOR_MODEL", "mistral") | |
# Session-based storage for isolated discussions | |
user_sessions: Dict[str, Dict] = {} | |
# Model Images | |
avatar_images = { | |
"Qwen3-32B": "https://cdn-avatars.huggingface.co/v1/production/uploads/620760a26e3b7210c2ff1943/-s1gyJfvbE1RgO5iBeNOi.png", | |
"DeepSeek-R1": "https://logosandtypes.com/wp-content/uploads/2025/02/deepseek.svg", | |
"Mistral Large": "https://logosandtypes.com/wp-content/uploads/2025/02/mistral-ai.svg", | |
"Meta-Llama-3.3-70B-Instruct": "https://registry.npmmirror.com/@lobehub/icons-static-png/1.46.0/files/dark/meta-color.png", | |
"arXiv Research Agent": "https://public.boxcloud.com/api/2.0/internal_files/804104772302/versions/860288648702/representations/png_paged_2048x2048/content/1.png?access_token=1!r4Iuj5vkFMywOMAPQ4M6QIr3eqkJ6CjlMzh77DAkRcGdVRvzG-Xh6GFZz_JkzoJuO9yRR5cQ6cs5VvUolhHxNM6JdliJ2JOi9VWm-BbB5C63s0_7bpaQYLFAJmLnlG2RzhX74_bK4XS-csGP8CI-9tVa6LUcrCNTKJmc-yddIepopLMZLqJ34h0nu69Yt0Not4yDErBTk2jWaneTBdhdXErOhCs9cz4HK-itpCfdL3Lze9oAjf6o8EVWRn6R0YPw95trQl7IziLd1P78BFuVaDjborvhs_yWgcw0uxXNZz_8WZh5z5NOvDq6sMo0uYGWiJ_g1JWyiaDJpsWBlHRiRwwF5FZLsVSXRz6dXD1MtKyOPs8J6CBYkGisicIysuiPsT1Kcyrgm-3jH1-tanOVs66TCmnGNbSYH_o_-x9iOdkI8rEL7-l2i5iHn22i-q8apZTOd_eQp22UCsmUBJQig7att_AwVKasmqOegDZHO2h1b_vSjeZ8ISBcg8i7fnFdF9Ej35s6OFkV5IyZtMzbAKdRlwdt5lupsshO5FCByR0kau9PVIiwJilI0t7zYsJtSXzVxVQEyEPuLTAlJJI7827NoNA1OSojaPsfhFrW4jEfJIgMoxNl_vFfZvLBmAA7Pk1SeaN7J0ebDji-bDbwqlPadp7JOB3s2Six11fm4Ss.&shared_link=https%3A%2F%2Fcornell.app.box.com%2Fv%2Farxiv-logomark-small-png&box_client_name=box-content-preview&box_client_version=3.7.0", | |
"GitHub Research Agent": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c2/GitHub_Invertocat_Logo.svg/250px-GitHub_Invertocat_Logo.svg.png", | |
"SEC EDGAR Research Agent": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/1c/Seal_of_the_United_States_Securities_and_Exchange_Commission.svg/250px-Seal_of_the_United_States_Securities_and_Exchange_Commission.svg.png", | |
"Web Search Research Agent": "https://duckduckgo.com/static-assets/favicons/DDG-iOS-icon_76x76.png", | |
"Wikipedia Research Agent": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/80/Wikipedia-logo-v2.svg/103px-Wikipedia-logo-v2.svg.png" | |
} | |
def get_session_id(request: gr.Request = None) -> str: | |
"""Generate or retrieve session ID""" | |
if request and hasattr(request, 'session_hash'): | |
return request.session_hash | |
return str(uuid.uuid4()) | |
def get_or_create_session_state(session_id: str) -> Dict: | |
"""Get or create isolated session state""" | |
if session_id not in user_sessions: | |
user_sessions[session_id] = { | |
"roundtable_state": { | |
"participants": [], | |
"messages": [], | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": [] | |
}, | |
"discussion_log": [], | |
"final_answer": "", | |
"api_keys": { | |
"mistral": None, | |
"sambanova": None | |
} | |
} | |
return user_sessions[session_id] | |
def update_session_api_keys(mistral_key, sambanova_key, session_id_state, request: gr.Request = None): | |
"""Update API keys for THIS SESSION ONLY""" | |
session_id = get_session_id(request) if not session_id_state else session_id_state | |
session = get_or_create_session_state(session_id) | |
status_messages = [] | |
# Update keys for THIS SESSION | |
if mistral_key.strip(): | |
session["api_keys"]["mistral"] = mistral_key.strip() | |
status_messages.append("β Mistral API key saved for this session") | |
elif MISTRAL_API_KEY: # Fall back to env var | |
session["api_keys"]["mistral"] = MISTRAL_API_KEY | |
status_messages.append("β Using Mistral API key from environment") | |
else: | |
status_messages.append("β No Mistral API key available") | |
if sambanova_key.strip(): | |
session["api_keys"]["sambanova"] = sambanova_key.strip() | |
status_messages.append("β SambaNova API key saved for this session") | |
elif SAMBANOVA_API_KEY: | |
session["api_keys"]["sambanova"] = SAMBANOVA_API_KEY | |
status_messages.append("β Using SambaNova API key from environment") | |
else: | |
status_messages.append("β No SambaNova API key available") | |
return " | ".join(status_messages), session_id | |
class VisualConsensusEngine: | |
def __init__(self, moderator_model: str = None, update_callback=None, session_id: str = None): | |
self.moderator_model = moderator_model or MODERATOR_MODEL | |
self.update_callback = update_callback | |
self.session_id = session_id | |
# Initialize OpenFloor Manager FIRST | |
self.floor_manager = OpenFloorManager(port=7860) | |
self.floor_manager.set_visual_callback(self.update_visual_state) | |
# Create OpenFloor research agents | |
from research_tools import WebSearchTool, WikipediaSearchTool, ArxivSearchTool, GitHubSearchTool, SECSearchTool | |
self.research_agents = { | |
'web_search': OpenFloorResearchAgent(WebSearchTool(), port=8001), | |
'wikipedia': OpenFloorResearchAgent(WikipediaSearchTool(), port=8002), | |
'arxiv': OpenFloorResearchAgent(ArxivSearchTool(), port=8003), | |
'github': OpenFloorResearchAgent(GitHubSearchTool(), port=8004), | |
'sec_edgar': OpenFloorResearchAgent(SECSearchTool(), port=8005) | |
} | |
# Start research agents and register with floor manager | |
self.start_openfloor_research_agents() | |
# Create a persistent conversation for this session | |
self.conversation_id = self.floor_manager.create_conversation([]) | |
# Available research agents for discovery | |
self.available_research_agents = list(self.research_agents.keys()) | |
# Get session-specific keys or fall back to global | |
session = get_or_create_session_state(session_id) if session_id else {"api_keys": {}} | |
session_keys = session.get("api_keys", {}) | |
mistral_key = session_keys.get("mistral") or MISTRAL_API_KEY | |
sambanova_key = session_keys.get("sambanova") or SAMBANOVA_API_KEY | |
self.models = { | |
'mistral': { | |
'name': 'Mistral Large', | |
'api_key': mistral_key, | |
'available': bool(mistral_key) | |
}, | |
'sambanova_deepseek': { | |
'name': 'DeepSeek-R1', | |
'api_key': sambanova_key, | |
'available': bool(sambanova_key) | |
}, | |
'sambanova_llama': { | |
'name': 'Meta-Llama-3.3-70B-Instruct', | |
'api_key': sambanova_key, | |
'available': bool(sambanova_key) | |
}, | |
'sambanova_qwen': { | |
'name': 'Qwen3-32B', | |
'api_key': sambanova_key, | |
'available': bool(sambanova_key) | |
} | |
} | |
# Store session keys for API calls | |
self.session_keys = { | |
'mistral': mistral_key, | |
'sambanova': sambanova_key | |
} | |
# Register AI experts with floor manager | |
self.register_ai_experts() | |
# Start floor manager service | |
self.floor_manager.start_floor_manager_service() | |
# PROFESSIONAL: Strong, expert role definitions matched to decision protocols | |
self.roles = { | |
'standard': "Provide expert analysis with clear reasoning and evidence.", | |
'expert_advocate': "You are a PASSIONATE EXPERT advocating for your specialized position. Present compelling evidence with conviction.", | |
'critical_analyst': "You are a RIGOROUS CRITIC. Identify flaws, risks, and weaknesses in arguments with analytical precision.", | |
'strategic_advisor': "You are a STRATEGIC ADVISOR. Focus on practical implementation, real-world constraints, and actionable insights.", | |
'research_specialist': "You are a RESEARCH EXPERT with deep domain knowledge. Provide authoritative analysis and evidence-based insights.", | |
'innovation_catalyst': "You are an INNOVATION EXPERT. Challenge conventional thinking and propose breakthrough approaches." | |
} | |
# PROFESSIONAL: Different prompt styles based on decision protocol | |
self.protocol_styles = { | |
'consensus': { | |
'intensity': 'collaborative', | |
'goal': 'finding common ground', | |
'language': 'respectful but rigorous' | |
}, | |
'majority_voting': { | |
'intensity': 'competitive', | |
'goal': 'winning the argument', | |
'language': 'passionate advocacy' | |
}, | |
'weighted_voting': { | |
'intensity': 'analytical', | |
'goal': 'demonstrating expertise', | |
'language': 'authoritative analysis' | |
}, | |
'ranked_choice': { | |
'intensity': 'comprehensive', | |
'goal': 'exploring all options', | |
'language': 'systematic evaluation' | |
}, | |
'unanimity': { | |
'intensity': 'diplomatic', | |
'goal': 'unanimous agreement', | |
'language': 'bridge-building dialogue' | |
} | |
} | |
def start_openfloor_research_agents(self): | |
"""Start research agents and register them with the floor manager""" | |
agent_ports = { | |
'web_search': 8001, | |
'wikipedia': 8002, | |
'arxiv': 8003, | |
'github': 8004, | |
'sec_edgar': 8005 | |
} | |
self.agent_servers = {} | |
for agent_name, port in agent_ports.items(): | |
agent = self.research_agents[agent_name] | |
server = OpenFloorAgentServer(agent, port) | |
if server.start_server(): | |
self.agent_servers[agent_name] = { | |
'server': server, | |
'port': port, | |
'url': f"http://localhost:{port}", | |
'manifest_url': f"http://localhost:{port}/openfloor/manifest" | |
} | |
# Register with floor manager | |
manifest = agent.get_manifest() | |
self.floor_manager.register_agent(manifest, f"http://localhost:{port}") | |
# Small delay between starting servers | |
time.sleep(0.5) | |
def register_ai_experts(self): | |
"""Register AI expert models as OpenFloor agents""" | |
for model_key, model_info in self.models.items(): | |
if model_info['available']: | |
# Create manifest for AI expert | |
expert_manifest = Manifest( | |
identification=Identification( | |
speakerUri=f"tag:consilium.ai,2025:{model_key}", | |
serviceUrl=f"internal://consilium/{model_key}", # Required parameter | |
conversationalName=model_info['name'], | |
role="AI Expert", | |
organization="Consilium Expert Panel", | |
synopsis=f"Expert AI model: {model_info['name']}" | |
), | |
capabilities=[ | |
Capability( | |
keyphrases=["analysis", "expertise", "reasoning", "decision"], | |
descriptions=[f"Expert analysis and reasoning by {model_info['name']}"], | |
languages=["en-us"] | |
) | |
] | |
) | |
# Register with floor manager (no URL since these are internal) | |
self.floor_manager.register_agent(expert_manifest, "internal://ai-expert") | |
def update_visual_state(self, state_update: Dict[str, Any]): | |
"""Update the visual roundtable state for this session""" | |
if self.update_callback: | |
self.update_callback(state_update) | |
def send_research_request_via_floor(self, function_name: str, query: str, requesting_expert: str) -> str: | |
"""Send research request through proper OpenFloor messaging""" | |
# Map function to research agent | |
function_to_agent = { | |
"search_web": "web_search", | |
"search_wikipedia": "wikipedia", | |
"search_academic": "arxiv", | |
"search_technology_trends": "github", | |
"search_financial_data": "sec_edgar" | |
} | |
if function_name not in function_to_agent: | |
return f"Unknown research function: {function_name}" | |
agent_name = function_to_agent[function_name] | |
research_agent = self.research_agents[agent_name] | |
target_speaker_uri = research_agent.manifest.identification.speakerUri | |
requesting_speaker_uri = f"tag:consilium.ai,2025:{requesting_expert}" | |
# Step 1: Invite research agent to conversation | |
success = self.floor_manager.invite_agent_to_conversation( | |
self.conversation_id, | |
target_speaker_uri, | |
requesting_speaker_uri | |
) | |
if not success: | |
return f"Failed to invite {agent_name} to conversation" | |
# Step 2: Send research request via floor manager | |
research_dialog = DialogEvent( | |
speakerUri=requesting_speaker_uri, | |
features={"text": TextFeature(values=[query])} | |
) | |
research_envelope = Envelope( | |
conversation=Conversation(id=self.conversation_id), | |
sender=Sender(speakerUri=requesting_speaker_uri), | |
events=[ | |
UtteranceEvent( | |
dialogEvent=research_dialog, | |
to=To(speakerUri=target_speaker_uri) | |
) | |
] | |
) | |
# Route through floor manager | |
routing_success = self.floor_manager.route_message(research_envelope) | |
if not routing_success: | |
return f"Failed to route research request to {agent_name}" | |
# Step 3: Wait for response and collect result | |
# In a real implementation, this would be asynchronous | |
# For now, we'll use the direct research call as fallback | |
try: | |
result = research_agent.tool.search(query) | |
# Step 4: Send result back through floor as ContextEvent | |
result_envelope = Envelope( | |
conversation=Conversation(id=self.conversation_id), | |
sender=Sender(speakerUri=target_speaker_uri), | |
events=[ | |
ContextEvent( | |
parameters={ | |
"research_function": function_name, | |
"query": query, | |
"requesting_expert": requesting_expert, | |
"result": result | |
} | |
) | |
] | |
) | |
self.floor_manager.route_message(result_envelope) | |
# Step 5: Remove research agent from conversation | |
self.dismiss_research_agent_via_floor(agent_name, requesting_expert) | |
return result | |
except Exception as e: | |
error_msg = f"Research error: {str(e)}" | |
# Send error through floor | |
error_envelope = Envelope( | |
conversation=Conversation(id=self.conversation_id), | |
sender=Sender(speakerUri=target_speaker_uri), | |
events=[ | |
ContextEvent( | |
parameters={ | |
"research_error": error_msg, | |
"function": function_name, | |
"query": query | |
} | |
) | |
] | |
) | |
self.floor_manager.route_message(error_envelope) | |
self.dismiss_research_agent_via_floor(agent_name, requesting_expert) | |
return error_msg | |
def dismiss_research_agent_via_floor(self, agent_name: str, requesting_expert: str): | |
"""Properly dismiss research agent via OpenFloor messaging""" | |
research_agent = self.research_agents[agent_name] | |
target_speaker_uri = research_agent.manifest.identification.speakerUri | |
# Send bye event | |
bye_envelope = Envelope( | |
conversation=Conversation(id=self.conversation_id), | |
sender=Sender(speakerUri=target_speaker_uri), | |
events=[ | |
ByeEvent( | |
parameters={ | |
"message": "Research task completed. Leaving conversation." | |
} | |
) | |
] | |
) | |
self.floor_manager.route_message(bye_envelope) | |
# Remove from conversation participants | |
if self.conversation_id in self.floor_manager.active_conversations: | |
conversation_state = self.floor_manager.active_conversations[self.conversation_id] | |
if target_speaker_uri in conversation_state['participants']: | |
conversation_state['participants'].remove(target_speaker_uri) | |
def handle_function_calls(self, completion, original_prompt: str, calling_model: str) -> str: | |
"""UNIFIED function call handler with enhanced research capabilities""" | |
# Check if completion is valid | |
if not completion or not completion.choices or len(completion.choices) == 0: | |
print(f"Invalid completion object for {calling_model}") | |
return "Analysis temporarily unavailable - invalid API response" | |
message = completion.choices[0].message | |
# If no function calls, return regular response | |
if not hasattr(message, 'tool_calls') or not message.tool_calls: | |
content = message.content | |
if isinstance(content, list): | |
text_parts = [] | |
for part in content: | |
if isinstance(part, dict) and 'text' in part: | |
text_parts.append(part['text']) | |
elif isinstance(part, str): | |
text_parts.append(part) | |
return ' '.join(text_parts) if text_parts else "Analysis completed" | |
elif isinstance(content, str): | |
return content | |
else: | |
return str(content) if content else "Analysis completed" | |
# Get the calling model's name for UI display | |
calling_model_name = self.models[calling_model]['name'] | |
# Process each function call | |
messages = [ | |
{"role": "user", "content": original_prompt}, | |
{ | |
"role": "assistant", | |
"content": message.content or "", | |
"tool_calls": message.tool_calls | |
} | |
] | |
for tool_call in message.tool_calls: | |
try: | |
function_name = tool_call.function.name | |
arguments = json.loads(tool_call.function.arguments) | |
query_param = arguments.get("query") or arguments.get("topic") or arguments.get("technology") or arguments.get("company") | |
if query_param: | |
session = get_or_create_session_state(self.session_id) | |
current_state = session["roundtable_state"] | |
all_messages = list(current_state.get("messages", [])) | |
# Add request message to the CALLING MODEL (Mistral) | |
request_message = { | |
"speaker": calling_model_name, | |
"text": f"π **Research Request**: {function_name.replace('_', ' ').title()}\nπ Query: \"{query_param}\"\nβ³ Waiting for research results...", | |
"type": "research_request" | |
} | |
all_messages.append(request_message) | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
if calling_model_name not in existing_bubbles: | |
existing_bubbles.append(calling_model_name) | |
self.update_visual_state({ | |
"participants": current_state.get("participants", []), | |
"messages": all_messages, | |
"currentSpeaker": calling_model_name, | |
"thinking": [], | |
"showBubbles": existing_bubbles | |
}) | |
time.sleep(1) | |
result = self._execute_research_function(function_name, arguments, calling_model_name) | |
# Ensure result is a string | |
if not isinstance(result, str): | |
result = str(result) | |
# Log the research activity (with access to session log function) | |
session = get_or_create_session_state(self.session_id) | |
def session_log_function(event_type, speaker="", content="", **kwargs): | |
session["discussion_log"].append({ | |
'type': event_type, | |
'speaker': speaker, | |
'content': content, | |
'timestamp': datetime.now().strftime('%H:%M:%S'), | |
**kwargs | |
}) | |
# Add function result to conversation | |
messages.append({ | |
"role": "tool", | |
"tool_call_id": tool_call.id, | |
"content": result | |
}) | |
except Exception as e: | |
print(f"Error processing tool call: {str(e)}") | |
messages.append({ | |
"role": "tool", | |
"tool_call_id": tool_call.id, | |
"content": f"Research error: {str(e)}" | |
}) | |
continue | |
# Continue conversation with research results integrated | |
try: | |
from openai import OpenAI | |
if calling_model == 'mistral': | |
client = OpenAI( | |
base_url="https://api.mistral.ai/v1", | |
api_key=self.session_keys.get('mistral') | |
) | |
model_name = 'mistral-large-latest' | |
else: | |
client = OpenAI( | |
base_url="https://api.sambanova.ai/v1", | |
api_key=self.session_keys.get('sambanova') | |
) | |
model_mapping = { | |
'sambanova_deepseek': 'DeepSeek-R1', | |
'sambanova_llama': 'Meta-Llama-3.3-70B-Instruct', | |
'sambanova_qwen': 'Qwen3-32B' | |
} | |
model_name = model_mapping.get(calling_model, 'Meta-Llama-3.3-70B-Instruct') | |
final_completion = client.chat.completions.create( | |
model=model_name, | |
messages=messages, | |
max_tokens=1000, | |
temperature=0.7 | |
) | |
if final_completion and final_completion.choices and len(final_completion.choices) > 0: | |
final_content = final_completion.choices[0].message.content | |
if isinstance(final_content, list): | |
text_parts = [] | |
for part in final_content: | |
if isinstance(part, dict) and 'text' in part: | |
text_parts.append(part['text']) | |
elif isinstance(part, str): | |
text_parts.append(part) | |
return ' '.join(text_parts) if text_parts else "Analysis completed with research integration." | |
elif isinstance(final_content, str): | |
return final_content | |
else: | |
return str(final_content) if final_content else "Analysis completed with research integration." | |
else: | |
return message.content or "Analysis completed with research integration." | |
except Exception as e: | |
print(f"Error in follow-up completion for {calling_model}: {str(e)}") | |
return message.content or "Analysis completed with research integration." | |
def _execute_research_function(self, function_name: str, arguments: dict, requesting_model_name: str = None) -> str: | |
"""Execute research function using proper OpenFloor messaging""" | |
query_param = arguments.get("query") or arguments.get("topic") or arguments.get("technology") or arguments.get("company") | |
if not query_param: | |
return "No query parameter found in research request" | |
# Show research starting | |
self.show_research_starting(function_name, query_param) | |
try: | |
# Use OpenFloor messaging for research | |
result = self.send_research_request_via_floor( | |
function_name, | |
query_param, | |
requesting_model_name or 'unknown' | |
) | |
# Show research complete | |
self.show_research_complete(function_name, query_param, len(result), requesting_model_name) | |
return result | |
except Exception as e: | |
error_msg = str(e) | |
self.show_research_error(function_name, query_param, error_msg, requesting_model_name) | |
return f"OpenFloor research error: {error_msg}" | |
def show_research_starting(self, function: str, query: str): | |
"""Invite specific research agent to join conversation""" | |
function_to_agent = { | |
"search_web": "web_search", | |
"search_wikipedia": "wikipedia", | |
"search_academic": "arxiv", | |
"search_technology_trends": "github", | |
"search_financial_data": "sec_edgar" | |
} | |
if function in function_to_agent: | |
agent_name = function_to_agent[function] | |
# Use the existing invite method | |
self.invite_research_agent(agent_name, "current_conversation", "AI Expert") | |
# Add the query information | |
research_agent = self.research_agents[agent_name] | |
agent_display_name = research_agent.manifest.identification.conversationalName | |
session = get_or_create_session_state(self.session_id) | |
current_state = session["roundtable_state"] | |
all_messages = list(current_state.get("messages", [])) | |
# Add research starting message | |
start_message = { | |
"speaker": agent_display_name, | |
"text": f"π **Starting Research**\nπ Query: \"{query}\"\nβ³ Connecting to data sources...", | |
"type": "research_starting" | |
} | |
all_messages.append(start_message) | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": current_state.get("participants", []), | |
"messages": all_messages, | |
"currentSpeaker": agent_display_name, | |
"thinking": [], | |
"showBubbles": existing_bubbles | |
}) | |
def show_research_complete(self, function: str, query: str, result_length: int, requesting_model_name: str = None): | |
"""Show research complete and dismiss the specific agent""" | |
function_to_agent = { | |
"search_web": "web_search", | |
"search_wikipedia": "wikipedia", | |
"search_academic": "arxiv", | |
"search_technology_trends": "github", | |
"search_financial_data": "sec_edgar" | |
} | |
if function in function_to_agent: | |
agent_name = function_to_agent[function] | |
research_agent = self.research_agents[agent_name] | |
agent_display_name = research_agent.manifest.identification.conversationalName | |
session = get_or_create_session_state(self.session_id) | |
current_state = session["roundtable_state"] | |
all_messages = list(current_state.get("messages", [])) | |
# Show completion message | |
complete_message = { | |
"speaker": agent_display_name, | |
"text": f"β **Research Complete**\nπ {result_length:,} characters analyzed\nπ― Research delivered to {requesting_model_name}", | |
"type": "research_complete" | |
} | |
all_messages.append(complete_message) | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": current_state.get("participants", []), | |
"messages": all_messages, | |
"currentSpeaker": requesting_model_name, | |
"thinking": [], | |
"showBubbles": existing_bubbles | |
}) | |
time.sleep(2) | |
# Use the existing dismiss method | |
self.dismiss_research_agent(agent_name, "current_conversation") | |
def show_research_error(self, function: str, query: str, error: str, requesting_model_name: str = None): | |
"""Show research error from the specific agent and dismiss it""" | |
function_to_agent = { | |
"search_web": "web_search", | |
"search_wikipedia": "wikipedia", | |
"search_academic": "arxiv", | |
"search_technology_trends": "github", | |
"search_financial_data": "sec_edgar" | |
} | |
if function in function_to_agent: | |
agent_name = function_to_agent[function] | |
research_agent = self.research_agents[agent_name] | |
agent_display_name = research_agent.manifest.identification.conversationalName | |
session = get_or_create_session_state(self.session_id) | |
current_state = session["roundtable_state"] | |
all_messages = list(current_state.get("messages", [])) | |
# Show error message from the specific agent | |
error_message = { | |
"speaker": agent_display_name, | |
"text": f"β **Research Error**\nπ¬ {function.replace('_', ' ').title()}\nπ Query: \"{query}\"\nβ οΈ Error: {error}\nπ Research failed, returning to discussion", | |
"type": "research_error" | |
} | |
all_messages.append(error_message) | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": current_state.get("participants", []), | |
"messages": all_messages, | |
"currentSpeaker": requesting_model_name, | |
"thinking": [], | |
"showBubbles": existing_bubbles | |
}) | |
time.sleep(1) | |
# Dismiss the research agent since research failed | |
self.dismiss_research_agent(agent_name, "current_conversation") | |
def invite_research_agent(self, agent_name: str, conversation_id: str, requesting_expert: str): | |
"""Invite a research agent to join the conversation visually""" | |
if agent_name in self.research_agents: | |
research_agent = self.research_agents[agent_name] | |
# Research agent joins the conversation | |
research_agent.join_conversation(conversation_id) | |
# Update visual state to show the research agent joining | |
session = get_or_create_session_state(self.session_id) | |
current_state = session["roundtable_state"] | |
# Add research agent to participants if not already there | |
participants = list(current_state.get("participants", [])) | |
agent_display_name = research_agent.manifest.identification.conversationalName | |
if agent_display_name not in participants: | |
participants.append(agent_display_name) | |
# Show join message | |
all_messages = list(current_state.get("messages", [])) | |
join_message = { | |
"speaker": agent_display_name, | |
"text": f"π **Joined Conversation**\nInvited by: {requesting_expert}\nSpecialty: {research_agent.manifest.identification.synopsis}\nReady to provide research assistance.", | |
"type": "agent_join" | |
} | |
all_messages.append(join_message) | |
# Update visual state | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
if agent_display_name not in existing_bubbles: | |
existing_bubbles.append(agent_display_name) | |
self.update_visual_state({ | |
"participants": participants, | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": existing_bubbles | |
}) | |
return True | |
return False | |
def dismiss_research_agent(self, agent_name: str, conversation_id: str): | |
"""Remove a research agent from the conversation visually""" | |
if agent_name in self.research_agents: | |
research_agent = self.research_agents[agent_name] | |
# Research agent leaves the conversation | |
research_agent.leave_conversation(conversation_id) | |
# Update visual state | |
session = get_or_create_session_state(self.session_id) | |
current_state = session["roundtable_state"] | |
agent_display_name = research_agent.manifest.identification.conversationalName | |
# Show leave message | |
all_messages = list(current_state.get("messages", [])) | |
leave_message = { | |
"speaker": agent_display_name, | |
"text": f"π **Leaving Conversation**\nResearch assistance complete. Agent dismissed.", | |
"type": "agent_leave" | |
} | |
all_messages.append(leave_message) | |
# Remove from bubbles but keep in participants list for history | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
if agent_display_name in existing_bubbles: | |
existing_bubbles.remove(agent_display_name) | |
self.update_visual_state({ | |
"participants": current_state.get("participants", []), | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": existing_bubbles | |
}) | |
return True | |
return False | |
def call_model(self, model: str, prompt: str, context: str = "") -> Optional[str]: | |
"""Enhanced model calling with native function calling support""" | |
if not self.models[model]['available']: | |
print(f"Model {model} not available - missing API key") | |
return None | |
full_prompt = f"{context}\n\n{prompt}" if context else prompt | |
try: | |
if model == 'mistral': | |
return self._call_mistral(full_prompt) | |
elif model.startswith('sambanova_'): | |
return self._call_sambanova(model, full_prompt) | |
except Exception as e: | |
print(f"Error calling {model}: {str(e)}") | |
return None | |
return None | |
def _call_sambanova(self, model: str, prompt: str) -> Optional[str]: | |
"""Enhanced SambaNova API call with native function calling""" | |
api_key = self.session_keys.get('sambanova') | |
if not api_key: | |
print(f"No SambaNova API key available for {model}") | |
return None | |
try: | |
from openai import OpenAI | |
client = OpenAI( | |
base_url="https://api.sambanova.ai/v1", | |
api_key=api_key | |
) | |
model_mapping = { | |
'sambanova_deepseek': 'DeepSeek-R1', | |
'sambanova_llama': 'Meta-Llama-3.3-70B-Instruct', | |
'sambanova_qwen': 'Qwen3-32B' | |
} | |
sambanova_model = model_mapping.get(model, 'Meta-Llama-3.3-70B-Instruct') | |
print(f"Calling SambaNova model: {sambanova_model}") | |
# Check if model supports function calling (Updated list) | |
supports_functions = sambanova_model in [ | |
'Meta-Llama-3.1-8B-Instruct', | |
'Meta-Llama-3.1-405B-Instruct', | |
'Meta-Llama-3.3-70B-Instruct' | |
] | |
if supports_functions: | |
completion = client.chat.completions.create( | |
model=sambanova_model, | |
messages=[{"role": "user", "content": prompt}], | |
tools=ENHANCED_SEARCH_FUNCTIONS, | |
tool_choice="auto", | |
max_tokens=1000, | |
temperature=0.7 | |
) | |
else: | |
# Qwen3-32B and other models that don't support function calling | |
print(f"Model {sambanova_model} doesn't support function calling - using regular completion") | |
completion = client.chat.completions.create( | |
model=sambanova_model, | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=1000, | |
temperature=0.7 | |
) | |
# Handle function calls if present (only for models that support it) | |
if supports_functions: | |
return self.handle_function_calls(completion, prompt, model) | |
else: | |
# For models without function calling, return response directly | |
if completion and completion.choices and len(completion.choices) > 0: | |
return completion.choices[0].message.content | |
else: | |
return None | |
except Exception as e: | |
print(f"Error calling SambaNova {model} ({sambanova_model}): {str(e)}") | |
# Print more detailed error info | |
import traceback | |
traceback.print_exc() | |
return None | |
def _call_mistral(self, prompt: str) -> Optional[str]: | |
"""Enhanced Mistral API call with native function calling""" | |
api_key = self.session_keys.get('mistral') | |
if not api_key: | |
print("No Mistral API key available") | |
return None | |
try: | |
from openai import OpenAI | |
client = OpenAI( | |
base_url="https://api.mistral.ai/v1", | |
api_key=api_key | |
) | |
print("Calling Mistral model: mistral-large-latest") | |
completion = client.chat.completions.create( | |
model='mistral-large-latest', | |
messages=[{"role": "user", "content": prompt}], | |
tools=ENHANCED_SEARCH_FUNCTIONS, | |
tool_choice="auto", | |
max_tokens=1000, | |
temperature=0.7 | |
) | |
# Check if we got a valid response | |
if not completion or not completion.choices or len(completion.choices) == 0: | |
print("Invalid response structure from Mistral") | |
return None | |
# Handle function calls if present | |
return self.handle_function_calls(completion, prompt, 'mistral') | |
except Exception as e: | |
print(f"Error calling Mistral API: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
return None | |
def assign_roles(self, models: List[str], role_assignment: str) -> Dict[str, str]: | |
"""Assign expert roles for rigorous analysis""" | |
if role_assignment == "none": | |
return {model: "standard" for model in models} | |
roles_to_assign = [] | |
if role_assignment == "balanced": | |
roles_to_assign = ["expert_advocate", "critical_analyst", "strategic_advisor", "research_specialist"] | |
elif role_assignment == "specialized": | |
roles_to_assign = ["research_specialist", "strategic_advisor", "innovation_catalyst", "expert_advocate"] | |
elif role_assignment == "adversarial": | |
roles_to_assign = ["critical_analyst", "innovation_catalyst", "expert_advocate", "strategic_advisor"] | |
while len(roles_to_assign) < len(models): | |
roles_to_assign.append("standard") | |
model_roles = {} | |
for i, model in enumerate(models): | |
model_roles[model] = roles_to_assign[i % len(roles_to_assign)] | |
return model_roles | |
def _extract_confidence(self, response: str) -> float: | |
"""Extract confidence score from response""" | |
if not response or not isinstance(response, str): | |
return 5.0 | |
confidence_match = re.search(r'Confidence:\s*(\d+(?:\.\d+)?)', response) | |
if confidence_match: | |
try: | |
return float(confidence_match.group(1)) | |
except ValueError: | |
pass | |
return 5.0 | |
def build_position_summary(self, all_messages: List[Dict], current_model: str, topology: str = "full_mesh") -> str: | |
"""Build expert position summary for analysis""" | |
current_model_name = self.models[current_model]['name'] | |
if topology == "full_mesh": | |
# Show latest position from each expert | |
latest_positions = {} | |
for msg in all_messages: | |
if msg["speaker"] != current_model_name and not msg["speaker"].endswith("Research Agent"): | |
latest_positions[msg["speaker"]] = { | |
'text': msg['text'][:150] + "..." if len(msg['text']) > 150 else msg['text'], | |
'confidence': msg.get('confidence', 5) | |
} | |
summary = "EXPERT POSITIONS:\n" | |
for speaker, pos in latest_positions.items(): | |
summary += f"β’ **{speaker}**: {pos['text']} (Confidence: {pos['confidence']}/10)\n" | |
elif topology == "star": | |
# Only show moderator's latest position | |
moderator_name = self.models[self.moderator_model]['name'] | |
summary = "MODERATOR ANALYSIS:\n" | |
for msg in reversed(all_messages): | |
if msg["speaker"] == moderator_name: | |
text = msg['text'][:200] + "..." if len(msg['text']) > 200 else msg['text'] | |
summary += f"β’ **{moderator_name}**: {text}\n" | |
break | |
elif topology == "ring": | |
# Only show previous expert's position | |
available_models = [model for model, info in self.models.items() if info['available']] | |
current_idx = available_models.index(current_model) | |
prev_idx = (current_idx - 1) % len(available_models) | |
prev_model_name = self.models[available_models[prev_idx]]['name'] | |
summary = "PREVIOUS EXPERT:\n" | |
for msg in reversed(all_messages): | |
if msg["speaker"] == prev_model_name: | |
text = msg['text'][:200] + "..." if len(msg['text']) > 200 else msg['text'] | |
summary += f"β’ **{prev_model_name}**: {text}\n" | |
break | |
return summary | |
def run_visual_consensus_session(self, question: str, discussion_rounds: int = 3, | |
decision_protocol: str = "consensus", role_assignment: str = "balanced", | |
topology: str = "full_mesh", moderator_model: str = "mistral", | |
log_function=None): | |
"""Run expert consensus with protocol-appropriate intensity and Research Agent integration""" | |
available_models = [model for model, info in self.models.items() if info['available']] | |
if not available_models: | |
return "β No AI models available" | |
# Add AI experts to the conversation | |
for model in available_models: | |
expert_speaker_uri = f"tag:consilium.ai,2025:{model}" | |
if self.conversation_id in self.floor_manager.active_conversations: | |
conversation_state = self.floor_manager.active_conversations[self.conversation_id] | |
if expert_speaker_uri not in conversation_state['participants']: | |
conversation_state['participants'].append(expert_speaker_uri) | |
# Send conversation start event through floor | |
start_envelope = Envelope( | |
conversation=Conversation(id=self.conversation_id), | |
sender=Sender(speakerUri="tag:consilium.ai,2025:session-manager"), | |
events=[ | |
ContextEvent( | |
parameters={ | |
"session_start": True, | |
"question": question, | |
"protocol": decision_protocol, | |
"participants": [self.models[model]['name'] for model in available_models] | |
} | |
) | |
] | |
) | |
self.floor_manager.route_message(start_envelope) | |
model_roles = self.assign_roles(available_models, role_assignment) | |
visual_participant_names = [self.models[model]['name'] for model in available_models] | |
# Get protocol-appropriate style | |
protocol_style = self.protocol_styles.get(decision_protocol, self.protocol_styles['consensus']) | |
# Use session-specific logging | |
def log_event(event_type: str, speaker: str = "", content: str = "", **kwargs): | |
if log_function: | |
log_function(event_type, speaker, content, **kwargs) | |
# Log the start | |
log_event('phase', content=f"π― Starting Expert Analysis: {question}") | |
log_event('phase', content=f"π Configuration: {len(available_models)} experts, {decision_protocol} protocol, {role_assignment} roles, {topology} topology") | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": [], | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": [], | |
"avatarImages": avatar_images | |
}) | |
all_messages = [] | |
log_event('phase', content="π Phase 1: Expert Initial Analysis") | |
for model in available_models: | |
# Log and set thinking state - PRESERVE BUBBLES | |
log_event('thinking', speaker=self.models[model]['name']) | |
session = get_or_create_session_state(self.session_id) | |
current_state = session["roundtable_state"] | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": [self.models[model]['name']], | |
"showBubbles": existing_bubbles, | |
"avatarImages": avatar_images | |
}) | |
time.sleep(1) | |
role = model_roles[model] | |
role_context = self.roles[role] | |
# PROTOCOL-ADAPTED: Prompt intensity based on decision protocol | |
if decision_protocol in ['majority_voting', 'ranked_choice']: | |
intensity_prompt = "π― CRITICAL DECISION" | |
action_prompt = "Take a STRONG, CLEAR position and defend it with compelling evidence" | |
stakes = "This decision has major consequences - be decisive and convincing" | |
elif decision_protocol == 'consensus': | |
intensity_prompt = "π€ COLLABORATIVE ANALYSIS" | |
action_prompt = "Provide thorough analysis while remaining open to other perspectives" | |
stakes = "Work toward building understanding and finding common ground" | |
else: # weighted_voting, unanimity | |
intensity_prompt = "π¬ EXPERT ANALYSIS" | |
action_prompt = "Provide authoritative analysis with detailed reasoning" | |
stakes = "Your expertise and evidence quality will determine influence" | |
prompt = f"""{intensity_prompt}: {question} | |
Your Role: {role_context} | |
ANALYSIS REQUIREMENTS: | |
- {action_prompt} | |
- {stakes} | |
- Use specific examples, data, and evidence | |
- If you need current information or research, you can search the web, Wikipedia, academic papers, technology trends, or financial data | |
- Maximum 200 words of focused analysis | |
- End with "Position: [YOUR CLEAR STANCE]" and "Confidence: X/10" | |
Provide your expert analysis:""" | |
# Log and set speaking state - PRESERVE BUBBLES | |
log_event('speaking', speaker=self.models[model]['name']) | |
# Calculate existing bubbles | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": self.models[model]['name'], | |
"thinking": [], | |
"showBubbles": existing_bubbles, | |
"avatarImages": avatar_images | |
}) | |
time.sleep(2) | |
# Call model - may trigger function calls and Research Agent activation | |
response = self.call_model(model, prompt) | |
# CRITICAL: Ensure response is a string | |
if response and not isinstance(response, str): | |
response = str(response) | |
if response: | |
confidence = self._extract_confidence(response) | |
message = { | |
"speaker": self.models[model]['name'], | |
"text": response, | |
"confidence": confidence, | |
"role": role | |
} | |
all_messages.append(message) | |
# Log the full response | |
log_event('message', | |
speaker=self.models[model]['name'], | |
content=response, | |
role=role, | |
confidence=confidence) | |
else: | |
# Handle failed API call gracefully | |
log_event('message', | |
speaker=self.models[model]['name'], | |
content="Analysis temporarily unavailable - API connection failed", | |
role=role, | |
confidence=0) | |
message = { | |
"speaker": self.models[model]['name'], | |
"text": "β οΈ Analysis temporarily unavailable - API connection failed. Please check your API keys and try again.", | |
"confidence": 0, | |
"role": role | |
} | |
all_messages.append(message) | |
# Update with new message | |
responded_speakers = list(set(msg["speaker"] for msg in all_messages if msg.get("speaker") and not msg["speaker"].endswith("Research Agent"))) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": responded_speakers, | |
"avatarImages": avatar_images | |
}) | |
time.sleep(2) # Longer pause to see the response | |
# Phase 2: Rigorous discussion rounds | |
if discussion_rounds > 0: | |
log_event('phase', content=f"π¬ Phase 2: Expert Discussion ({discussion_rounds} rounds)") | |
for round_num in range(discussion_rounds): | |
log_event('phase', content=f"π Expert Round {round_num + 1}") | |
for model in available_models: | |
# Log thinking with preserved bubbles | |
log_event('thinking', speaker=self.models[model]['name']) | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": [self.models[model]['name']], | |
"showBubbles": existing_bubbles, | |
"avatarImages": avatar_images | |
}) | |
time.sleep(1) | |
# Build expert position summary | |
position_summary = self.build_position_summary(all_messages, model, topology) | |
role = model_roles[model] | |
role_context = self.roles[role] | |
# PROTOCOL-ADAPTED: Discussion intensity based on protocol | |
if decision_protocol in ['majority_voting', 'ranked_choice']: | |
discussion_style = "DEFEND your position and CHALLENGE weak arguments" | |
discussion_goal = "Prove why your approach is superior" | |
elif decision_protocol == 'consensus': | |
discussion_style = "BUILD on other experts' insights and ADDRESS concerns" | |
discussion_goal = "Work toward a solution everyone can support" | |
else: | |
discussion_style = "REFINE your analysis and RESPOND to other experts" | |
discussion_goal = "Demonstrate the strength of your reasoning" | |
discussion_prompt = f"""π Expert Round {round_num + 1}: {question} | |
Your Role: {role_context} | |
{position_summary} | |
DISCUSSION FOCUS: | |
- {discussion_style} | |
- {discussion_goal} | |
- Address specific points raised by other experts | |
- Use current data and research if needed | |
- Maximum 180 words of focused response | |
- End with "Position: [UNCHANGED/EVOLVED]" and "Confidence: X/10" | |
Your expert response:""" | |
# Log speaking with preserved bubbles | |
log_event('speaking', speaker=self.models[model]['name']) | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": self.models[model]['name'], | |
"thinking": [], | |
"showBubbles": existing_bubbles, | |
"avatarImages": avatar_images | |
}) | |
time.sleep(2) | |
response = self.call_model(model, discussion_prompt) | |
if response: | |
confidence = self._extract_confidence(response) | |
message = { | |
"speaker": self.models[model]['name'], | |
"text": f"Round {round_num + 1}: {response}", | |
"confidence": confidence, | |
"role": model_roles[model] | |
} | |
all_messages.append(message) | |
log_event('message', | |
speaker=self.models[model]['name'], | |
content=f"Round {round_num + 1}: {response}", | |
role=model_roles[model], | |
confidence=confidence) | |
else: | |
# Handle failed API call gracefully | |
log_event('message', | |
speaker=self.models[model]['name'], | |
content=f"Round {round_num + 1}: Analysis temporarily unavailable - API connection failed", | |
role=model_roles[model], | |
confidence=0) | |
message = { | |
"speaker": self.models[model]['name'], | |
"text": f"Round {round_num + 1}: β οΈ Analysis temporarily unavailable - API connection failed.", | |
"confidence": 0, | |
"role": model_roles[model] | |
} | |
all_messages.append(message) | |
# Update visual state | |
responded_speakers = list(set(msg["speaker"] for msg in all_messages if msg.get("speaker") and not msg["speaker"].endswith("Research Agent"))) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": responded_speakers, | |
"avatarImages": avatar_images | |
}) | |
time.sleep(1) | |
# Phase 3: PROTOCOL-SPECIFIC final decision | |
if decision_protocol == 'consensus': | |
phase_name = "π€ Phase 3: Building Consensus" | |
moderator_title = "Senior Advisor" | |
elif decision_protocol in ['majority_voting', 'ranked_choice']: | |
phase_name = "βοΈ Phase 3: Final Decision" | |
moderator_title = "Lead Analyst" | |
else: | |
phase_name = "π Phase 3: Expert Synthesis" | |
moderator_title = "Lead Researcher" | |
log_event('phase', content=f"{phase_name} - {decision_protocol}") | |
log_event('thinking', speaker="All experts", content="Synthesizing final recommendation...") | |
expert_names = [self.models[model]['name'] for model in available_models] | |
# Preserve existing bubbles during final thinking | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": expert_names, | |
"showBubbles": existing_bubbles, | |
"avatarImages": avatar_images | |
}) | |
time.sleep(2) | |
# Generate PROTOCOL-APPROPRIATE final analysis | |
moderator = self.moderator_model if self.models[self.moderator_model]['available'] else available_models[0] | |
# Build expert summary | |
final_positions = {} | |
confidence_scores = [] | |
# Get list of all research agent names | |
research_agent_names = [agent.manifest.identification.conversationalName for agent in self.research_agents.values()] | |
for msg in all_messages: | |
speaker = msg["speaker"] | |
if (speaker not in [moderator_title, 'Consilium'] and | |
speaker not in research_agent_names): | |
if speaker not in final_positions: | |
final_positions[speaker] = [] | |
final_positions[speaker].append(msg) | |
if 'confidence' in msg: | |
confidence_scores.append(msg['confidence']) | |
# Create PROFESSIONAL expert summary | |
expert_summary = f"π― EXPERT ANALYSIS: {question}\n\nFINAL EXPERT POSITIONS:\n" | |
for speaker, messages in final_positions.items(): | |
latest_msg = messages[-1] | |
role = latest_msg.get('role', 'standard') | |
# Extract the core argument | |
core_argument = latest_msg['text'][:200] + "..." if len(latest_msg['text']) > 200 else latest_msg['text'] | |
confidence = latest_msg.get('confidence', 5) | |
expert_summary += f"\nπ **{speaker}** ({role}):\n{core_argument}\nFinal Confidence: {confidence}/10\n" | |
avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 5.0 | |
# PROTOCOL-SPECIFIC synthesis prompt | |
if decision_protocol == 'consensus': | |
synthesis_goal = "Build a CONSENSUS recommendation that all experts can support" | |
synthesis_format = "**CONSENSUS REACHED:** [Yes/Partial/No]\n**RECOMMENDED APPROACH:** [Synthesis]\n**AREAS OF AGREEMENT:** [Common ground]\n**REMAINING CONCERNS:** [Issues to address]" | |
elif decision_protocol in ['majority_voting', 'ranked_choice']: | |
synthesis_goal = "Determine the STRONGEST position and declare a clear winner" | |
synthesis_format = "**DECISION:** [Clear recommendation]\n**WINNING ARGUMENT:** [Most compelling case]\n**KEY EVIDENCE:** [Supporting data]\n**IMPLEMENTATION:** [Next steps]" | |
else: | |
synthesis_goal = "Synthesize expert insights into actionable recommendations" | |
synthesis_format = "**ANALYSIS CONCLUSION:** [Summary]\n**RECOMMENDED APPROACH:** [Best path forward]\n**RISK ASSESSMENT:** [Key considerations]\n**CONFIDENCE LEVEL:** [Overall certainty]" | |
consensus_prompt = f"""{expert_summary} | |
π SENIOR ANALYSIS REQUIRED: | |
{synthesis_goal} | |
SYNTHESIS REQUIREMENTS: | |
- Analyze the quality and strength of each expert position | |
- Identify areas where experts align vs disagree | |
- Provide a clear, actionable recommendation | |
- Use additional research if needed to resolve disagreements | |
- Maximum 300 words of decisive analysis | |
Average Expert Confidence: {avg_confidence:.1f}/10 | |
Protocol: {decision_protocol} | |
Format: | |
{synthesis_format} | |
Provide your synthesis:""" | |
log_event('speaking', speaker=moderator_title, content="Synthesizing expert analysis into final recommendation...") | |
# Preserve existing bubbles during final speaking | |
existing_bubbles = list(current_state.get("showBubbles", [])) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": "Consilium", | |
"thinking": [], | |
"showBubbles": existing_bubbles, | |
"avatarImages": avatar_images | |
}) | |
# Call moderator model - may also trigger function calls | |
consensus_result = self.call_model(moderator, consensus_prompt) | |
if not consensus_result: | |
consensus_result = f"""**ANALYSIS INCOMPLETE:** Technical difficulties prevented full synthesis. | |
**RECOMMENDED APPROACH:** Manual review of expert positions required. | |
**KEY CONSIDERATIONS:** All expert inputs should be carefully evaluated. | |
**NEXT STEPS:** Retry analysis or conduct additional expert consultation.""" | |
# Determine result quality based on protocol | |
if decision_protocol == 'consensus': | |
if "CONSENSUS REACHED: Yes" in consensus_result or avg_confidence >= 7.5: | |
visual_summary = "β Expert Consensus Achieved" | |
elif "Partial" in consensus_result: | |
visual_summary = "β οΈ Partial Consensus - Some Expert Disagreement" | |
else: | |
visual_summary = "π€ No Consensus - Significant Expert Disagreement" | |
elif decision_protocol in ['majority_voting', 'ranked_choice']: | |
if any(word in consensus_result.upper() for word in ["DECISION:", "WINNING", "RECOMMEND"]): | |
visual_summary = "βοΈ Clear Expert Recommendation" | |
else: | |
visual_summary = "π€ Expert Analysis Complete" | |
else: | |
visual_summary = "π Expert Analysis Complete" | |
final_message = { | |
"speaker": moderator_title, | |
"text": f"{visual_summary}\n\n{consensus_result}", | |
"confidence": avg_confidence, | |
"role": "moderator" | |
} | |
all_messages.append(final_message) | |
log_event('message', | |
speaker=moderator_title, | |
content=consensus_result, | |
confidence=avg_confidence) | |
responded_speakers = list(set(msg["speaker"] for msg in all_messages if msg.get("speaker") and not msg["speaker"].endswith("Research Agent"))) | |
self.update_visual_state({ | |
"participants": visual_participant_names, | |
"messages": all_messages, | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": responded_speakers, | |
"avatarImages": avatar_images | |
}) | |
log_event('phase', content="β Expert Analysis Complete") | |
return consensus_result | |
def update_session_roundtable_state(session_id: str, new_state: Dict): | |
"""Update roundtable state for specific session""" | |
session = get_or_create_session_state(session_id) | |
session["roundtable_state"].update(new_state) | |
return json.dumps(session["roundtable_state"]) | |
def run_consensus_discussion_session(question: str, discussion_rounds: int = 3, | |
decision_protocol: str = "consensus", role_assignment: str = "balanced", | |
topology: str = "full_mesh", moderator_model: str = "mistral", | |
session_id_state: str = None, | |
request: gr.Request = None): | |
"""Session-isolated expert consensus discussion""" | |
# Get unique session | |
session_id = get_session_id(request) if not session_id_state else session_id_state | |
session = get_or_create_session_state(session_id) | |
# Reset session state for new discussion | |
session["discussion_log"] = [] | |
session["final_answer"] = "" | |
def session_visual_update_callback(state_update): | |
"""Session-specific visual update callback""" | |
update_session_roundtable_state(session_id, state_update) | |
def session_log_event(event_type: str, speaker: str = "", content: str = "", **kwargs): | |
"""Add event to THIS session's log only""" | |
session["discussion_log"].append({ | |
'type': event_type, | |
'speaker': speaker, | |
'content': content, | |
'timestamp': datetime.now().strftime('%H:%M:%S'), | |
**kwargs | |
}) | |
# Create engine with session-specific callback | |
engine = VisualConsensusEngine(moderator_model, session_visual_update_callback, session_id) | |
# Run consensus with session-specific logging | |
result = engine.run_visual_consensus_session( | |
question, discussion_rounds, decision_protocol, | |
role_assignment, topology, moderator_model, | |
session_log_event | |
) | |
# Generate session-specific final answer | |
available_models = [model for model, info in engine.models.items() if info['available']] | |
session["final_answer"] = f"""## π― Expert Analysis Results | |
{result} | |
--- | |
### π Analysis Summary | |
- **Question:** {question} | |
- **Protocol:** {decision_protocol.replace('_', ' ').title()} | |
- **Topology:** {topology.replace('_', ' ').title()} | |
- **Experts:** {len(available_models)} AI specialists | |
- **Roles:** {role_assignment.title()} | |
- **Research Integration:** Native function calling with live data | |
- **Session ID:** {session_id[:3]}... | |
*Generated by Consilium: Multi-AI Expert Consensus Platform*""" | |
# Format session-specific discussion log | |
formatted_log = format_session_discussion_log(session["discussion_log"]) | |
return ("β Expert Analysis Complete - See results below", | |
json.dumps(session["roundtable_state"]), | |
session["final_answer"], | |
formatted_log, | |
session_id) | |
def format_session_discussion_log(discussion_log: list) -> str: | |
"""Format discussion log for specific session""" | |
if not discussion_log: | |
return "No discussion log available yet." | |
formatted_log = "# π Complete Expert Discussion Log\n\n" | |
for entry in discussion_log: | |
timestamp = entry.get('timestamp', datetime.now().strftime('%H:%M:%S')) | |
if entry['type'] == 'thinking': | |
formatted_log += f"**{timestamp}** π€ **{entry['speaker']}** is analyzing...\n\n" | |
elif entry['type'] == 'speaking': | |
formatted_log += f"**{timestamp}** π¬ **{entry['speaker']}** is presenting...\n\n" | |
elif entry['type'] == 'message': | |
formatted_log += f"**{timestamp}** π **{entry['speaker']}** ({entry.get('role', 'standard')}):\n" | |
formatted_log += f"> {entry['content']}\n" | |
if 'confidence' in entry: | |
formatted_log += f"*Confidence: {entry['confidence']}/10*\n\n" | |
else: | |
formatted_log += "\n" | |
elif entry['type'] == 'research_request': | |
function_name = entry.get('function', 'Unknown') | |
query = entry.get('query', 'Unknown query') | |
requesting_expert = entry.get('requesting_expert', 'Unknown expert') | |
formatted_log += f"**{timestamp}** π **Research Agent** - Research Request:\n" | |
formatted_log += f"> **Function:** {function_name.replace('_', ' ').title()}\n" | |
formatted_log += f"> **Query:** \"{query}\"\n" | |
formatted_log += f"> **Requested by:** {requesting_expert}\n\n" | |
elif entry['type'] == 'research_result': | |
function_name = entry.get('function', 'Unknown') | |
query = entry.get('query', 'Unknown query') | |
requesting_expert = entry.get('requesting_expert', 'Unknown expert') | |
full_result = entry.get('full_result', entry.get('content', 'No result')) | |
formatted_log += f"**{timestamp}** π **Research Agent** - Research Results:\n" | |
formatted_log += f"> **Function:** {function_name.replace('_', ' ').title()}\n" | |
formatted_log += f"> **Query:** \"{query}\"\n" | |
formatted_log += f"> **For Expert:** {requesting_expert}\n\n" | |
formatted_log += f"**Research Results:**\n" | |
formatted_log += f"```\n{full_result}\n```\n\n" | |
elif entry['type'] == 'phase': | |
formatted_log += f"\n---\n## {entry['content']}\n---\n\n" | |
return formatted_log | |
def check_model_status_session(session_id_state: str = None, request: gr.Request = None): | |
"""Check and display current model availability for specific session""" | |
session_id = get_session_id(request) if not session_id_state else session_id_state | |
session = get_or_create_session_state(session_id) | |
session_keys = session.get("api_keys", {}) | |
# Get session-specific keys or fall back to env vars | |
mistral_key = session_keys.get("mistral") or MISTRAL_API_KEY | |
sambanova_key = session_keys.get("sambanova") or SAMBANOVA_API_KEY | |
status_info = "## π Expert Model Availability\n\n" | |
models = { | |
'Mistral Large': mistral_key, | |
'DeepSeek-R1': sambanova_key, | |
'Meta-Llama-3.3-70B-Instruct': sambanova_key, | |
'Qwen3-32B': sambanova_key | |
} | |
for model_name, available in models.items(): | |
if available: | |
status = f"β Available (Key: {available[:3]}...)" | |
else: | |
status = "β Not configured" | |
status_info += f"**{model_name}:** {status}\n\n" | |
return status_info | |
# Create the professional interface | |
with gr.Blocks(title="π Consilium: Multi-AI Expert Consensus Platform - OFP (Open Floor Protocol) Version", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# π Consilium: Multi-AI Expert Consensus Platform - OFP (Open Floor Protocol) Version | |
**Watch expert AI models collaborate with live research to solve your most complex decisions** | |
### π Features: | |
* Visual roundtable of the AI models, including speech bubbles to see the discussion in real time. | |
* Includes Mistral (**mistral-large-latest**) via their API and the Models **DeepSeek-R1**, **Meta-Llama-3.3-70B-Instruct** and **Qwen3-32B** via the SambaNova API. | |
* Optional Research Agents (**Web Search**, **Wikipedia**, **arXiv**, **GitHub**, **SEC EDGAR**) added via the [Open Floor Protocol](https://github.com/open-voice-interoperability/openfloor-docs). | |
* Assign different roles to the models, the protocol they should follow, and decide the communication strategy. | |
* Pick one model as the lead analyst (had the best results when picking Mistral). | |
* Configure the amount of discussion rounds. | |
* After the discussion, the whole conversation and a final answer will be presented. | |
""") | |
# Hidden session state component | |
session_state = gr.State() | |
with gr.Tab("π Expert Consensus Analysis"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
question_input = gr.Textbox( | |
label="π― Strategic Decision Question", | |
placeholder="What complex decision would you like expert AI analysis on?", | |
lines=3, | |
value="Should our startup pivot to AI-first product development?" | |
) | |
# Professional question suggestion buttons | |
with gr.Accordion("βοΈ Example Questions", open=True): | |
suggestion_btn1 = gr.Button("π’ Business Strategy", size="sm") | |
suggestion_btn2 = gr.Button("βοΈ Technology Choice", size="sm") | |
suggestion_btn3 = gr.Button("π Policy Analysis", size="sm") | |
with gr.Row(): | |
decision_protocol = gr.Dropdown( | |
choices=["consensus", "majority_voting", "weighted_voting", "ranked_choice", "unanimity"], | |
value="consensus", | |
label="βοΈ Decision Protocol", | |
info="How should experts reach a conclusion?" | |
) | |
role_assignment = gr.Dropdown( | |
choices=["balanced", "specialized", "adversarial", "none"], | |
value="balanced", | |
label="π Expert Roles", | |
info="How should expertise be distributed?" | |
) | |
with gr.Row(): | |
topology = gr.Dropdown( | |
choices=["full_mesh", "star", "ring"], | |
value="full_mesh", | |
label="π Communication Structure", | |
info="Full mesh: all collaborate, Star: through moderator, Ring: sequential" | |
) | |
moderator_model = gr.Dropdown( | |
choices=["mistral", "sambanova_deepseek", "sambanova_llama", "sambanova_qwen"], | |
value="mistral", | |
label="π¨ββοΈ Lead Analyst", | |
info="Mistral works best as Lead" | |
) | |
rounds_input = gr.Slider( | |
minimum=1, maximum=5, value=2, step=1, | |
label="π Discussion Rounds", | |
info="More rounds = deeper analysis" | |
) | |
start_btn = gr.Button("π Start Expert Analysis", variant="primary", size="lg") | |
status_output = gr.Textbox(label="π Analysis Status", interactive=False) | |
with gr.Column(scale=2): | |
# The visual roundtable component | |
roundtable = consilium_roundtable( | |
label="AI Expert Roundtable", | |
label_icon="https://avatars.githubusercontent.com/u/46052400?s=48&v=4", | |
value=json.dumps({ | |
"participants": [], | |
"messages": [], | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": [], | |
"avatarImages": avatar_images | |
}) | |
) | |
# Final answer section | |
with gr.Row(): | |
final_answer_output = gr.Markdown( | |
label="π― Expert Analysis Results", | |
value="*Expert analysis results will appear here...*" | |
) | |
# Collapsible discussion log | |
with gr.Accordion("π Complete Expert Discussion Log", open=False): | |
discussion_log_output = gr.Markdown( | |
value="*Complete expert discussion transcript will appear here...*" | |
) | |
# Professional question handlers | |
def set_business_question(): | |
return "Should our startup pivot to AI-first product development?" | |
def set_tech_question(): | |
return "Microservices vs monolith architecture for our scaling platform?" | |
def set_policy_question(): | |
return "Should we prioritize geoengineering research over emissions reduction?" | |
suggestion_btn1.click(set_business_question, outputs=[question_input]) | |
suggestion_btn2.click(set_tech_question, outputs=[question_input]) | |
suggestion_btn3.click(set_policy_question, outputs=[question_input]) | |
# Event handlers | |
def on_start_discussion(question, rounds, protocol, roles, topology, moderator, session_id_state, request: gr.Request = None): | |
# Start discussion immediately | |
result = run_consensus_discussion_session(question, rounds, protocol, roles, topology, moderator, session_id_state, request) | |
return result | |
start_btn.click( | |
on_start_discussion, | |
inputs=[question_input, rounds_input, decision_protocol, role_assignment, topology, moderator_model, session_state], | |
outputs=[status_output, roundtable, final_answer_output, discussion_log_output, session_state] | |
) | |
# Auto-refresh the roundtable state every 1 second during discussion for better visibility | |
def refresh_roundtable(session_id_state, request: gr.Request = None): | |
session_id = get_session_id(request) if not session_id_state else session_id_state | |
if session_id in user_sessions: | |
return json.dumps(user_sessions[session_id]["roundtable_state"]) | |
return json.dumps({ | |
"participants": [], | |
"messages": [], | |
"currentSpeaker": None, | |
"thinking": [], | |
"showBubbles": [], | |
"avatarImages": avatar_images | |
}) | |
gr.Timer(1.0).tick(refresh_roundtable, inputs=[session_state], outputs=[roundtable]) | |
with gr.Tab("π§ Configuration & Setup"): | |
gr.Markdown("## π API Keys Configuration") | |
gr.Markdown("*Enter your API keys below OR set them as environment variables*") | |
gr.Markdown("**π Privacy:** Your API keys are stored only for your session and are not shared with other users.") | |
with gr.Row(): | |
with gr.Column(): | |
mistral_key_input = gr.Textbox( | |
label="Mistral API Key", | |
placeholder="Enter your Mistral API key...", | |
type="password", | |
info="Required for Mistral Large expert model with function calling" | |
) | |
sambanova_key_input = gr.Textbox( | |
label="SambaNova API Key", | |
placeholder="Enter your SambaNova API key...", | |
type="password", | |
info="Required for DeepSeek, Llama, and QwQ expert models with function calling" | |
) | |
with gr.Column(): | |
# Add a button to save/update keys | |
save_keys_btn = gr.Button("πΎ Save API Keys", variant="secondary") | |
keys_status = gr.Textbox( | |
label="Keys Status", | |
value="No API keys configured - using environment variables if available", | |
interactive=False | |
) | |
# Connect the save button | |
save_keys_btn.click( | |
update_session_api_keys, | |
inputs=[mistral_key_input, sambanova_key_input, session_state], | |
outputs=[keys_status, session_state] | |
) | |
model_status_display = gr.Markdown(check_model_status_session()) | |
# Add refresh button for model status | |
refresh_status_btn = gr.Button("π Refresh Expert Status") | |
refresh_status_btn.click( | |
check_model_status_session, | |
inputs=[session_state], | |
outputs=[model_status_display] | |
) | |
gr.Markdown(""" | |
## π οΈ Setup Instructions | |
### π Quick Start (Recommended) | |
1. **Enter API keys above** (they'll be used only for your session) | |
2. **Click "Save API Keys"** | |
3. **Start an expert analysis with live research!** | |
### π Get API Keys: | |
- **Mistral:** [console.mistral.ai](https://console.mistral.ai) | |
- **SambaNova:** [cloud.sambanova.ai](https://cloud.sambanova.ai) | |
## Local Setups | |
### π Environment Variables | |
```bash | |
export MISTRAL_API_KEY=your_key_here | |
export SAMBANOVA_API_KEY=your_key_here | |
export MODERATOR_MODEL=mistral | |
``` | |
### π Dependencies | |
```bash | |
pip install -r requirements.txt | |
``` | |
### Start | |
```bash | |
python app.py | |
``` | |
""") | |
with gr.Tab("π Documentation"): | |
gr.Markdown(""" | |
## π **Expert Role Assignments** | |
#### **βοΈ Balanced (Recommended for Most Decisions)** | |
- **Expert Advocate**: Passionate defender with compelling evidence | |
- **Critical Analyst**: Rigorous critic identifying flaws and risks | |
- **Strategic Advisor**: Practical implementer focused on real-world constraints | |
- **Research Specialist**: Authoritative knowledge with evidence-based insights | |
#### **π― Specialized (For Technical Decisions)** | |
- **Research Specialist**: Deep domain expertise and authoritative analysis | |
- **Strategic Advisor**: Implementation-focused practical guidance | |
- **Innovation Catalyst**: Breakthrough approaches and unconventional thinking | |
- **Expert Advocate**: Passionate championing of specialized viewpoints | |
#### **βοΈ Adversarial (For Controversial Topics)** | |
- **Critical Analyst**: Aggressive identification of weaknesses | |
- **Innovation Catalyst**: Deliberately challenging conventional wisdom | |
- **Expert Advocate**: Passionate defense of positions | |
- **Strategic Advisor**: Hard-nosed practical constraints | |
## βοΈ **Decision Protocols Explained** | |
### π€ **Consensus** (Collaborative) | |
- **Goal**: Find solutions everyone can support | |
- **Style**: Respectful but rigorous dialogue | |
- **Best for**: Team decisions, long-term strategy | |
- **Output**: "Expert Consensus Achieved" or areas of disagreement | |
### π³οΈ **Majority Voting** (Competitive) | |
- **Goal**: Let the strongest argument win | |
- **Style**: Passionate advocacy and strong positions | |
- **Best for**: Clear either/or decisions | |
- **Output**: "Clear Expert Recommendation" with winning argument | |
### π **Weighted Voting** (Expertise-Based) | |
- **Goal**: Let expertise and evidence quality determine influence | |
- **Style**: Authoritative analysis with detailed reasoning | |
- **Best for**: Technical decisions requiring deep knowledge | |
- **Output**: Expert synthesis weighted by confidence levels | |
### π **Ranked Choice** (Comprehensive) | |
- **Goal**: Explore all options systematically | |
- **Style**: Systematic evaluation of alternatives | |
- **Best for**: Complex decisions with multiple options | |
- **Output**: Ranked recommendations with detailed analysis | |
### π **Unanimity** (Diplomatic) | |
- **Goal**: Achieve complete agreement | |
- **Style**: Bridge-building and diplomatic dialogue | |
- **Best for**: High-stakes decisions requiring buy-in | |
- **Output**: Unanimous agreement or identification of blocking issues | |
## π **Communication Structures** | |
### πΈοΈ **Full Mesh** (Complete Collaboration) | |
- Every expert sees all other expert responses | |
- Maximum information sharing and cross-pollination | |
- Best for comprehensive analysis and complex decisions | |
- **Use when:** You want thorough multi-perspective analysis | |
### β **Star** (Hierarchical Analysis) | |
- Experts only see the lead analyst's responses | |
- Prevents groupthink, maintains independent thinking | |
- Good for getting diverse, uninfluenced perspectives | |
- **Use when:** You want fresh, independent expert takes | |
### π **Ring** (Sequential Analysis) | |
- Each expert only sees the previous expert's response | |
- Creates interesting chains of reasoning and idea evolution | |
- Can lead to surprising consensus emergence | |
- **Use when:** You want to see how ideas build and evolve | |
## ποΈ **OpenFloor Protocol Integration** | |
This implementation uses the [Open Floor Protocol (OFP)](https://github.com/open-voice-interoperability/openfloor-docs) for: | |
### π **Inter-Agent Communication** | |
- **Envelope Structure**: Standard JSON format for agent messaging | |
- **Event Types**: Utterance, Context, Invite, Bye events | |
- **Conversation Management**: Persistent conversation threads | |
- **Agent Discovery**: Manifest-based capability matching | |
### π **Research Agent Architecture** | |
- **Dedicated Services**: Each research tool runs as an independent OpenFloor agent | |
- **HTTP Endpoints**: `/openfloor/conversation` and `/openfloor/manifest` | |
- **Proper Invitation Flow**: Agents are invited to join conversations for specific research tasks | |
- **Context Events**: Research results are delivered via standardized context events | |
### ποΈ **Floor Manager** | |
- **Central Coordination**: Routes messages between all agents | |
- **Conversation State**: Maintains participant lists and message history | |
- **Visual Integration**: Updates UI based on OpenFloor conversation events | |
- **Session Isolation**: Each user session has its own conversation context | |
### π **Event Flow Example** | |
1. **AI Expert** needs research β generates function call | |
2. **Floor Manager** invites **Research Agent** to conversation | |
3. **Research Agent** joins and receives query via UtteranceEvent | |
4. **Research Agent** performs search and responds with ContextEvent | |
5. **Floor Manager** routes result back to requesting expert | |
6. **Research Agent** sends ByeEvent and leaves conversation | |
This architecture ensures true OpenFloor compliance while maintaining the visual roundtable experience. | |
""") | |
# Launch configuration | |
if __name__ == "__main__": | |
demo.queue(default_concurrency_limit=10) | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False, | |
mcp_server=False | |
) |