Spaces:
Running
Running
import aiohttp | |
import re | |
from typing import Dict, Any, List | |
from src.utils.config import config | |
from src.utils.logger import get_logger | |
logger = get_logger(__name__) | |
class AIRAAIntegration: | |
def __init__(self): | |
self.webhook_url = config.AIRAA_WEBHOOK_URL | |
self.api_key = config.AIRAA_API_KEY | |
self.enabled = bool(self.webhook_url) | |
async def send_research_data(self, research_result: Dict[str, Any]) -> bool: | |
if not self.enabled: | |
return False | |
try: | |
payload = self._format_for_airaa(research_result) | |
headers = {"Content-Type": "application/json"} | |
if self.api_key: | |
headers["Authorization"] = f"Bearer {self.api_key}" | |
timeout = aiohttp.ClientTimeout(total=30) | |
async with aiohttp.ClientSession(timeout=timeout) as session: | |
async with session.post( | |
self.webhook_url, json=payload, headers=headers | |
) as response: | |
success = response.status == 200 | |
if success: | |
logger.info("Data sent to AIRAA successfully") | |
else: | |
logger.warning(f"AIRAA webhook returned {response.status}") | |
return success | |
except Exception as e: | |
logger.error(f"AIRAA integration failed: {e}") | |
return False | |
def _format_for_airaa(self, result: Dict[str, Any]) -> Dict[str, Any]: | |
return { | |
"source": "web3-research-copilot", | |
"timestamp": result["metadata"]["timestamp"], | |
"query": result["query"], | |
"research_plan": result.get("research_plan", {}), | |
"findings": result["result"], | |
"data_sources": result["sources"], | |
"confidence_score": self._calculate_confidence(result), | |
"tags": self._extract_tags(result["query"]), | |
"structured_data": self._extract_structured_data(result["result"]) | |
} | |
def _calculate_confidence(self, result: Dict[str, Any]) -> float: | |
base_score = 0.7 | |
source_boost = min(len(result.get("sources", [])) * 0.1, 0.3) | |
error_penalty = 0.3 if not result.get("success", True) else 0 | |
return max(0.0, min(1.0, base_score + source_boost - error_penalty)) | |
def _extract_tags(self, query: str) -> List[str]: | |
tags = [] | |
query_lower = query.lower() | |
token_patterns = { | |
"bitcoin": ["bitcoin", "btc"], | |
"ethereum": ["ethereum", "eth"], | |
"defi": ["defi", "defillama", "protocol", "tvl"], | |
"market-analysis": ["price", "market", "analysis"], | |
"trading-volume": ["volume", "trading"] | |
} | |
for tag, patterns in token_patterns.items(): | |
if any(pattern in query_lower for pattern in patterns): | |
tags.append(tag) | |
return tags | |
def _extract_structured_data(self, result_text: str) -> Dict[str, Any]: | |
structured = {} | |
price_pattern = r'\$([0-9,]+\.?[0-9]*)' | |
percentage_pattern = r'([+-]?[0-9]+\.?[0-9]*)%' | |
prices = re.findall(price_pattern, result_text) | |
percentages = re.findall(percentage_pattern, result_text) | |
if prices: | |
structured["prices"] = [float(p.replace(',', '')) for p in prices[:5]] | |
if percentages: | |
structured["percentages"] = [float(p) for p in percentages[:5]] | |
return structured | |