consilium_ofp / openfloor_helper /OpenFloorResearchAgent.py
azettl's picture
Update openfloor_helper/OpenFloorResearchAgent.py
16cff0c verified
from research_tools.base_tool import BaseTool
from openfloor.manifest import *
from openfloor.envelope import *
class OpenFloorResearchAgent:
"""Wrap research tools as independent OpenFloor agents"""
def __init__(self, tool: BaseTool, port: int = None):
self.tool = tool
self.port = port
self.manifest = self._create_manifest()
self.active_conversations = {}
def _create_manifest(self) -> Manifest:
"""Create OpenFloor manifest for this research agent"""
speaker_uri = f"tag:research.consilium,2025:{self.tool.name.lower().replace(' ', '-')}-agent"
# Tool-specific keyphrases and capabilities
tool_configs = {
'Web Search': {
'keyphrases': ['web', 'search', 'current', 'news', 'latest', 'recent'],
'synopsis': 'Real-time web search for current information and trends'
},
'Wikipedia': {
'keyphrases': ['facts', 'encyclopedia', 'history', 'knowledge', 'definition'],
'synopsis': 'Authoritative encyclopedia research and factual verification'
},
'arXiv': {
'keyphrases': ['academic', 'research', 'papers', 'science', 'study'],
'synopsis': 'Academic research papers and scientific literature analysis'
},
'GitHub': {
'keyphrases': ['technology', 'code', 'development', 'programming', 'trends'],
'synopsis': 'Technology adoption trends and software development analysis'
},
'SEC EDGAR': {
'keyphrases': ['financial', 'company', 'earnings', 'sec', 'filings'],
'synopsis': 'Corporate financial data and SEC regulatory filings research'
}
}
config = tool_configs.get(self.tool.name, {
'keyphrases': ['research', 'data'],
'synopsis': self.tool.description
})
return Manifest(
identification=Identification(
speakerUri=speaker_uri,
serviceUrl=f"http://localhost:{self.port}/openfloor" if self.port else None,
conversationalName=f"{self.tool.name} Research Agent",
organization="Consilium Research Division",
role="Research Specialist",
synopsis=config['synopsis']
),
capabilities=[
Capability(
keyphrases=config['keyphrases'],
descriptions=[self.tool.description],
languages=["en-us"]
)
]
)
def handle_utterance_event(self, envelope: Envelope) -> Envelope:
"""Handle research requests from AI experts"""
print(f"πŸ” DEBUG: {self.tool.name} - Starting handle_utterance_event")
# Extract the query from the utterance
for event in envelope.events:
if hasattr(event, 'eventType') and event.eventType == 'utterance':
dialog_event = event.parameters.get('dialogEvent')
if dialog_event and isinstance(dialog_event, dict):
# dialog_event is a dict, not an object - use dict access
features = dialog_event.get('features')
print(f"πŸ” DEBUG: features: {features}")
if features and 'text' in features:
text_feature = features['text']
print(f"πŸ” DEBUG: text_feature: {text_feature}")
if 'tokens' in text_feature:
tokens = text_feature['tokens']
query_text = ' '.join([token.get('value', '') for token in tokens])
print(f"πŸ” DEBUG: {self.tool.name} received query: '{query_text}'")
# Perform the research
import time
start_time = time.time()
research_result = self.tool.search(query_text)
end_time = time.time()
print(f"πŸ” DEBUG: {self.tool.name} completed in {end_time - start_time:.2f}s")
print(f"πŸ” DEBUG: Result length: {len(research_result)} chars")
print(f"πŸ” DEBUG: Result preview: {research_result[:200]}...")
# Create response envelope
return self._create_response_envelope(envelope, research_result, query_text)
return self._create_error_response(envelope, "Could not extract query from request")
def _create_response_envelope(self, original_envelope: Envelope, research_result: str, query: str) -> Envelope:
"""Create OpenFloor response envelope with research results"""
# Create response dialog event
response_dialog = DialogEvent(
speakerUri=self.manifest.identification.speakerUri,
features={
"text": TextFeature(values=[research_result])
}
)
# Create context with research metadata
research_context = ContextEvent(
parameters={
"research_tool": self.tool.name,
"query": query,
"source": self.tool.name.lower().replace(' ', '_'),
"confidence": self._assess_result_confidence(research_result),
"timestamp": datetime.now().isoformat()
}
)
# Create response envelope
response_envelope = Envelope(
conversation=original_envelope.conversation,
sender=Sender(speakerUri=self.manifest.identification.speakerUri),
events=[
UtteranceEvent(dialogEvent=response_dialog),
research_context
]
)
return response_envelope
def _assess_result_confidence(self, result: str) -> float:
"""Assess confidence in research result quality"""
if not result or len(result) < 50:
return 0.3
quality_indicators = [
(len(result) > 500, 0.2), # Substantial content
(any(year in result for year in ['2024', '2025']), 0.2), # Recent data
(result.count('\n') > 5, 0.1), # Well-structured
('error' not in result.lower(), 0.3), # No errors
(any(indicator in result.lower() for indicator in ['data', 'study', 'research']), 0.2) # Authoritative
]
confidence = 0.5 # Base confidence
for condition, boost in quality_indicators:
if condition:
confidence += boost
return min(1.0, confidence)
def _create_error_response(self, original_envelope: Envelope, error_msg: str) -> Envelope:
"""Create error response envelope"""
error_dialog = DialogEvent(
speakerUri=self.manifest.identification.speakerUri,
features={
"text": TextFeature(values=[f"Research error: {error_msg}"])
}
)
return Envelope(
conversation=original_envelope.conversation,
sender=Sender(speakerUri=self.manifest.identification.speakerUri),
events=[UtteranceEvent(dialogEvent=error_dialog)]
)
def join_conversation(self, conversation_id: str) -> bool:
"""Join a conversation as an active research agent"""
self.active_conversations[conversation_id] = {
'joined_at': datetime.now(),
'status': 'active'
}
return True
def leave_conversation(self, conversation_id: str) -> bool:
"""Leave a conversation"""
if conversation_id in self.active_conversations:
del self.active_conversations[conversation_id]
return True
def get_manifest(self) -> Manifest:
"""Return the OpenFloor manifest for this research agent"""
return self.manifest