Spaces:
Running
Running
from research_tools.base_tool import BaseTool | |
from openfloor.manifest import * | |
from openfloor.envelope import * | |
class OpenFloorResearchAgent: | |
"""Wrap research tools as independent OpenFloor agents""" | |
def __init__(self, tool: BaseTool, port: int = None): | |
self.tool = tool | |
self.port = port | |
self.manifest = self._create_manifest() | |
self.active_conversations = {} | |
def _create_manifest(self) -> Manifest: | |
"""Create OpenFloor manifest for this research agent""" | |
speaker_uri = f"tag:research.consilium,2025:{self.tool.name.lower().replace(' ', '-')}-agent" | |
# Tool-specific keyphrases and capabilities | |
tool_configs = { | |
'Web Search': { | |
'keyphrases': ['web', 'search', 'current', 'news', 'latest', 'recent'], | |
'synopsis': 'Real-time web search for current information and trends' | |
}, | |
'Wikipedia': { | |
'keyphrases': ['facts', 'encyclopedia', 'history', 'knowledge', 'definition'], | |
'synopsis': 'Authoritative encyclopedia research and factual verification' | |
}, | |
'arXiv': { | |
'keyphrases': ['academic', 'research', 'papers', 'science', 'study'], | |
'synopsis': 'Academic research papers and scientific literature analysis' | |
}, | |
'GitHub': { | |
'keyphrases': ['technology', 'code', 'development', 'programming', 'trends'], | |
'synopsis': 'Technology adoption trends and software development analysis' | |
}, | |
'SEC EDGAR': { | |
'keyphrases': ['financial', 'company', 'earnings', 'sec', 'filings'], | |
'synopsis': 'Corporate financial data and SEC regulatory filings research' | |
} | |
} | |
config = tool_configs.get(self.tool.name, { | |
'keyphrases': ['research', 'data'], | |
'synopsis': self.tool.description | |
}) | |
return Manifest( | |
identification=Identification( | |
speakerUri=speaker_uri, | |
serviceUrl=f"http://localhost:{self.port}/openfloor" if self.port else None, | |
conversationalName=f"{self.tool.name} Research Agent", | |
organization="Consilium Research Division", | |
role="Research Specialist", | |
synopsis=config['synopsis'] | |
), | |
capabilities=[ | |
Capability( | |
keyphrases=config['keyphrases'], | |
descriptions=[self.tool.description], | |
languages=["en-us"] | |
) | |
] | |
) | |
def handle_utterance_event(self, envelope: Envelope) -> Envelope: | |
"""Handle research requests from AI experts""" | |
print(f"π DEBUG: {self.tool.name} - Starting handle_utterance_event") | |
# Extract the query from the utterance | |
for event in envelope.events: | |
if hasattr(event, 'eventType') and event.eventType == 'utterance': | |
dialog_event = event.parameters.get('dialogEvent') | |
if dialog_event and isinstance(dialog_event, dict): | |
# dialog_event is a dict, not an object - use dict access | |
features = dialog_event.get('features') | |
print(f"π DEBUG: features: {features}") | |
if features and 'text' in features: | |
text_feature = features['text'] | |
print(f"π DEBUG: text_feature: {text_feature}") | |
if 'tokens' in text_feature: | |
tokens = text_feature['tokens'] | |
query_text = ' '.join([token.get('value', '') for token in tokens]) | |
print(f"π DEBUG: {self.tool.name} received query: '{query_text}'") | |
# Perform the research | |
import time | |
start_time = time.time() | |
research_result = self.tool.search(query_text) | |
end_time = time.time() | |
print(f"π DEBUG: {self.tool.name} completed in {end_time - start_time:.2f}s") | |
print(f"π DEBUG: Result length: {len(research_result)} chars") | |
print(f"π DEBUG: Result preview: {research_result[:200]}...") | |
# Create response envelope | |
return self._create_response_envelope(envelope, research_result, query_text) | |
return self._create_error_response(envelope, "Could not extract query from request") | |
def _create_response_envelope(self, original_envelope: Envelope, research_result: str, query: str) -> Envelope: | |
"""Create OpenFloor response envelope with research results""" | |
# Create response dialog event | |
response_dialog = DialogEvent( | |
speakerUri=self.manifest.identification.speakerUri, | |
features={ | |
"text": TextFeature(values=[research_result]) | |
} | |
) | |
# Create context with research metadata | |
research_context = ContextEvent( | |
parameters={ | |
"research_tool": self.tool.name, | |
"query": query, | |
"source": self.tool.name.lower().replace(' ', '_'), | |
"confidence": self._assess_result_confidence(research_result), | |
"timestamp": datetime.now().isoformat() | |
} | |
) | |
# Create response envelope | |
response_envelope = Envelope( | |
conversation=original_envelope.conversation, | |
sender=Sender(speakerUri=self.manifest.identification.speakerUri), | |
events=[ | |
UtteranceEvent(dialogEvent=response_dialog), | |
research_context | |
] | |
) | |
return response_envelope | |
def _assess_result_confidence(self, result: str) -> float: | |
"""Assess confidence in research result quality""" | |
if not result or len(result) < 50: | |
return 0.3 | |
quality_indicators = [ | |
(len(result) > 500, 0.2), # Substantial content | |
(any(year in result for year in ['2024', '2025']), 0.2), # Recent data | |
(result.count('\n') > 5, 0.1), # Well-structured | |
('error' not in result.lower(), 0.3), # No errors | |
(any(indicator in result.lower() for indicator in ['data', 'study', 'research']), 0.2) # Authoritative | |
] | |
confidence = 0.5 # Base confidence | |
for condition, boost in quality_indicators: | |
if condition: | |
confidence += boost | |
return min(1.0, confidence) | |
def _create_error_response(self, original_envelope: Envelope, error_msg: str) -> Envelope: | |
"""Create error response envelope""" | |
error_dialog = DialogEvent( | |
speakerUri=self.manifest.identification.speakerUri, | |
features={ | |
"text": TextFeature(values=[f"Research error: {error_msg}"]) | |
} | |
) | |
return Envelope( | |
conversation=original_envelope.conversation, | |
sender=Sender(speakerUri=self.manifest.identification.speakerUri), | |
events=[UtteranceEvent(dialogEvent=error_dialog)] | |
) | |
def join_conversation(self, conversation_id: str) -> bool: | |
"""Join a conversation as an active research agent""" | |
self.active_conversations[conversation_id] = { | |
'joined_at': datetime.now(), | |
'status': 'active' | |
} | |
return True | |
def leave_conversation(self, conversation_id: str) -> bool: | |
"""Leave a conversation""" | |
if conversation_id in self.active_conversations: | |
del self.active_conversations[conversation_id] | |
return True | |
def get_manifest(self) -> Manifest: | |
"""Return the OpenFloor manifest for this research agent""" | |
return self.manifest |