consilium_ofp / openfloor_helper /OpenFloorManager.py
azettl's picture
Update openfloor_helper/OpenFloorManager.py
14b5df8 verified
from openfloor.manifest import *
from openfloor.envelope import *
# Model Images
avatar_images = {
"Qwen3-32B": "https://cdn-avatars.huggingface.co/v1/production/uploads/620760a26e3b7210c2ff1943/-s1gyJfvbE1RgO5iBeNOi.png",
"DeepSeek-R1": "https://logosandtypes.com/wp-content/uploads/2025/02/deepseek.svg",
"Mistral Large": "https://logosandtypes.com/wp-content/uploads/2025/02/mistral-ai.svg",
"Meta-Llama-3.3-70B-Instruct": "https://registry.npmmirror.com/@lobehub/icons-static-png/1.46.0/files/dark/meta-color.png",
"arXiv Research Agent": "https://public.boxcloud.com/api/2.0/internal_files/804104772302/versions/860288648702/representations/png_paged_2048x2048/content/1.png?access_token=1!r4Iuj5vkFMywOMAPQ4M6QIr3eqkJ6CjlMzh77DAkRcGdVRvzG-Xh6GFZz_JkzoJuO9yRR5cQ6cs5VvUolhHxNM6JdliJ2JOi9VWm-BbB5C63s0_7bpaQYLFAJmLnlG2RzhX74_bK4XS-csGP8CI-9tVa6LUcrCNTKJmc-yddIepopLMZLqJ34h0nu69Yt0Not4yDErBTk2jWaneTBdhdXErOhCs9cz4HK-itpCfdL3Lze9oAjf6o8EVWRn6R0YPw95trQl7IziLd1P78BFuVaDjborvhs_yWgcw0uxXNZz_8WZh5z5NOvDq6sMo0uYGWiJ_g1JWyiaDJpsWBlHRiRwwF5FZLsVSXRz6dXD1MtKyOPs8J6CBYkGisicIysuiPsT1Kcyrgm-3jH1-tanOVs66TCmnGNbSYH_o_-x9iOdkI8rEL7-l2i5iHn22i-q8apZTOd_eQp22UCsmUBJQig7att_AwVKasmqOegDZHO2h1b_vSjeZ8ISBcg8i7fnFdF9Ej35s6OFkV5IyZtMzbAKdRlwdt5lupsshO5FCByR0kau9PVIiwJilI0t7zYsJtSXzVxVQEyEPuLTAlJJI7827NoNA1OSojaPsfhFrW4jEfJIgMoxNl_vFfZvLBmAA7Pk1SeaN7J0ebDji-bDbwqlPadp7JOB3s2Six11fm4Ss.&shared_link=https%3A%2F%2Fcornell.app.box.com%2Fv%2Farxiv-logomark-small-png&box_client_name=box-content-preview&box_client_version=3.7.0",
"GitHub Research Agent": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c2/GitHub_Invertocat_Logo.svg/250px-GitHub_Invertocat_Logo.svg.png",
"SEC EDGAR Research Agent": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/1c/Seal_of_the_United_States_Securities_and_Exchange_Commission.svg/250px-Seal_of_the_United_States_Securities_and_Exchange_Commission.svg.png",
"Web Search Research Agent": "https://duckduckgo.com/static-assets/favicons/DDG-iOS-icon_76x76.png",
"Wikipedia Research Agent": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/80/Wikipedia-logo-v2.svg/103px-Wikipedia-logo-v2.svg.png"
}
class OpenFloorManager:
"""Central floor manager for coordinating all OpenFloor agents - FIXED VERSION"""
def __init__(self, port: int = 7860):
self.port = port
self.agent_registry = {} # speakerUri -> agent info
self.active_conversations = {} # conversation_id -> conversation state
self.visual_callback = None
self.message_history = {} # conversation_id -> message list
def register_agent(self, manifest: Manifest, agent_url: str):
"""Register an agent with the floor manager"""
speaker_uri = manifest.identification.speakerUri
self.agent_registry[speaker_uri] = {
'manifest': manifest,
'url': agent_url,
'status': 'available',
'last_seen': datetime.now()
}
print(f"πŸ›οΈ Floor Manager: Registered agent {manifest.identification.conversationalName}")
def discover_agents(self) -> List[Manifest]:
"""Return manifests of all registered agents"""
return [info['manifest'] for info in self.agent_registry.values()]
def create_conversation(self, initial_participants: List[str] = None) -> str:
"""Create a new conversation with optional initial participants"""
conversation_id = f"conv:{uuid.uuid4()}"
# Create proper OpenFloor conversation structure
conversants = []
if initial_participants:
for participant_uri in initial_participants:
if participant_uri in self.agent_registry:
manifest = self.agent_registry[participant_uri]['manifest']
conversants.append(Conversant(
identification=manifest.identification
))
conversation = Conversation(
id=conversation_id,
conversants=conversants
)
self.active_conversations[conversation_id] = {
'conversation': conversation,
'participants': initial_participants or [],
'messages': [],
'created_at': datetime.now(),
'status': 'active'
}
self.message_history[conversation_id] = []
print(f"πŸ›οΈ Floor Manager: Created conversation {conversation_id}")
return conversation_id
def invite_agent_to_conversation(self, conversation_id: str, target_speaker_uri: str,
inviting_speaker_uri: str) -> bool:
"""Send proper OpenFloor InviteEvent to an agent"""
if conversation_id not in self.active_conversations:
print(f"πŸ›οΈ Floor Manager: Conversation {conversation_id} not found")
return False
if target_speaker_uri not in self.agent_registry:
print(f"πŸ›οΈ Floor Manager: Agent {target_speaker_uri} not registered")
return False
conversation_state = self.active_conversations[conversation_id]
conversation = conversation_state['conversation']
target_agent = self.agent_registry[target_speaker_uri]
# Create proper OpenFloor InviteEvent
invite_envelope = Envelope(
conversation=conversation,
sender=Sender(speakerUri=inviting_speaker_uri),
events=[
InviteEvent(
to=To(speakerUri=target_speaker_uri),
parameters={
'conversation_id': conversation_id,
'invited_by': inviting_speaker_uri,
'invitation_message': f"You are invited to join the expert analysis discussion"
}
)
]
)
# Send invitation to target agent
response = self._send_to_agent(target_agent['url'], invite_envelope)
if response:
# Add to conversation participants
if target_speaker_uri not in conversation_state['participants']:
conversation_state['participants'].append(target_speaker_uri)
# Add to conversation conversants
target_manifest = target_agent['manifest']
conversation.conversants.append(Conversant(
identification=target_manifest.identification
))
self._update_visual_state(conversation_id)
print(f"πŸ›οΈ Floor Manager: Successfully invited {target_speaker_uri} to {conversation_id}")
return True
print(f"πŸ›οΈ Floor Manager: Failed to invite {target_speaker_uri}")
return False
def route_message(self, envelope: Envelope) -> bool:
"""Route message to appropriate recipients with proper OpenFloor semantics"""
conversation_id = envelope.conversation.id
if conversation_id not in self.active_conversations:
print(f"πŸ›οΈ Floor Manager: Cannot route - conversation {conversation_id} not found")
return False
conversation_state = self.active_conversations[conversation_id]
sender_uri = envelope.sender.speakerUri
# Store message in conversation history
message_record = {
'envelope': envelope,
'timestamp': datetime.now(),
'sender': sender_uri
}
conversation_state['messages'].append(message_record)
self.message_history[conversation_id].append(message_record)
# Process each event in the envelope
routed_successfully = True
for event in envelope.events:
if hasattr(event, 'to') and event.to:
# Directed message - send to specific agent
target_uri = event.to.speakerUri
if target_uri in self.agent_registry and target_uri != sender_uri:
target_agent = self.agent_registry[target_uri]
success = self._send_to_agent(target_agent['url'], envelope)
if not success:
routed_successfully = False
print(f"πŸ›οΈ Floor Manager: Failed to route directed message to {target_uri}")
else:
# Broadcast to all conversation participants except sender
for participant_uri in conversation_state['participants']:
if participant_uri != sender_uri and participant_uri in self.agent_registry:
participant_agent = self.agent_registry[participant_uri]
success = self._send_to_agent(participant_agent['url'], envelope)
if not success:
routed_successfully = False
print(f"πŸ›οΈ Floor Manager: Failed to broadcast to {participant_uri}")
# Update visual state after routing
self._update_visual_state(conversation_id)
return routed_successfully
def _send_to_agent(self, agent_url: str, envelope: Envelope) -> bool:
"""Send envelope to specific agent via HTTP POST"""
if agent_url == "internal://ai-expert":
# Internal AI experts don't have HTTP endpoints
return True
try:
# Fix: Use proper OpenFloor endpoint
response = requests.post(
f"{agent_url}/openfloor/conversation", # βœ… Fixed endpoint
json=json.loads(envelope.to_json()),
headers={'Content-Type': 'application/json'},
timeout=30
)
success = response.status_code == 200
if success:
print(f"πŸ›οΈ Floor Manager: Successfully sent message to {agent_url}")
else:
print(f"πŸ›οΈ Floor Manager: HTTP error {response.status_code} sending to {agent_url}")
print(f"πŸ›οΈ Floor Manager: Response: {response.text}")
return success
except Exception as e:
print(f"πŸ›οΈ Floor Manager: Error sending to {agent_url}: {e}")
return False
def _update_visual_state(self, conversation_id: str):
"""Update visual interface based on conversation state"""
if not self.visual_callback or conversation_id not in self.active_conversations:
return
conversation_state = self.active_conversations[conversation_id]
# Convert to visual format
participants = []
messages = []
# Get participant names
for participant_uri in conversation_state['participants']:
if participant_uri in self.agent_registry:
agent_info = self.agent_registry[participant_uri]
participants.append(agent_info['manifest'].identification.conversationalName)
else:
# Handle AI experts or other internal participants
participants.append(participant_uri.split(':')[-1] if ':' in participant_uri else participant_uri)
# Convert messages
for msg_record in conversation_state['messages']:
envelope = msg_record['envelope']
sender_uri = envelope.sender.speakerUri
# Get sender name
if sender_uri in self.agent_registry:
sender_name = self.agent_registry[sender_uri]['manifest'].identification.conversationalName
else:
sender_name = sender_uri.split(':')[-1] if ':' in sender_uri else sender_uri
# Extract message content from events
for event in envelope.events:
if hasattr(event, 'eventType'):
if event.eventType == 'utterance':
# Extract text from utterance dialog event
dialog_event = event.parameters.get('dialogEvent')
if dialog_event:
text = self._extract_text_from_dialog_event(dialog_event)
if text:
messages.append({
'speaker': sender_name,
'text': text,
'timestamp': msg_record['timestamp'].strftime('%H:%M:%S'),
'type': 'utterance'
})
elif event.eventType == 'context':
# Handle context events (like research results)
context_params = event.parameters
if 'research_function' in context_params:
messages.append({
'speaker': sender_name,
'text': f"πŸ” Research: {context_params.get('result', 'No result')}",
'timestamp': msg_record['timestamp'].strftime('%H:%M:%S'),
'type': 'research_result'
})
elif event.eventType == 'invite':
messages.append({
'speaker': sender_name,
'text': f"πŸ“¨ Invited agent to join conversation",
'timestamp': msg_record['timestamp'].strftime('%H:%M:%S'),
'type': 'system'
})
elif event.eventType == 'bye':
messages.append({
'speaker': sender_name,
'text': f"πŸ‘‹ Left the conversation",
'timestamp': msg_record['timestamp'].strftime('%H:%M:%S'),
'type': 'system'
})
# Call visual callback
self.visual_callback({
'participants': participants,
'messages': messages,
'currentSpeaker': None,
'thinking': [],
'showBubbles': participants,
'avatarImages': avatar_images
})
def _extract_text_from_dialog_event(self, dialog_event) -> str:
"""Extract text from dialog event structure"""
try:
if isinstance(dialog_event, dict):
features = dialog_event.get('features', {})
text_feature = features.get('text', {})
tokens = text_feature.get('tokens', [])
return ' '.join([token.get('value', '') for token in tokens])
return ""
except Exception as e:
print(f"πŸ›οΈ Floor Manager: Error extracting text: {e}")
return ""
def set_visual_callback(self, callback):
"""Set callback for visual updates"""
self.visual_callback = callback
def start_floor_manager_service(self):
"""Start the floor manager HTTP service"""
from flask import Flask, request, jsonify
app = Flask("openfloor-manager")
@app.route('/openfloor/discover', methods=['GET'])
def discover_agents():
"""Return list of available agent manifests"""
manifests = [json.loads(manifest.to_json()) for manifest in self.discover_agents()]
return jsonify(manifests)
@app.route('/openfloor/conversation', methods=['POST'])
def handle_conversation():
"""Handle incoming conversation messages"""
try:
envelope_data = request.get_json()
envelope = Envelope.from_json(json.dumps(envelope_data))
success = self.route_message(envelope)
if success:
return jsonify({'status': 'routed'})
else:
return jsonify({'error': 'Failed to route message'}), 400
except Exception as e:
print(f"πŸ›οΈ Floor Manager: Error handling conversation: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/openfloor/invite', methods=['POST'])
def invite_agent():
"""Handle agent invitation requests"""
try:
data = request.get_json()
conversation_id = data['conversation_id']
target_speaker_uri = data['target_speaker_uri']
inviting_speaker_uri = data['inviting_speaker_uri']
success = self.invite_agent_to_conversation(
conversation_id, target_speaker_uri, inviting_speaker_uri
)
return jsonify({'success': success})
except Exception as e:
print(f"πŸ›οΈ Floor Manager: Error inviting agent: {e}")
return jsonify({'error': str(e)}), 500
# Start server in background thread
import threading
server_thread = threading.Thread(
target=lambda: app.run(host='localhost', port=self.port + 100, debug=False)
)
server_thread.daemon = True
server_thread.start()
print(f"πŸ›οΈ OpenFloor Manager started on port {self.port + 100}")
return True