Spaces:
Running
Running
File size: 3,603 Bytes
f104fee 9b006e9 f104fee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
import aiohttp
import re
from typing import Dict, Any, List
from src.utils.config import config
from src.utils.logger import get_logger
logger = get_logger(__name__)
class AIRAAIntegration:
def __init__(self):
self.webhook_url = config.AIRAA_WEBHOOK_URL
self.api_key = config.AIRAA_API_KEY
self.enabled = bool(self.webhook_url)
async def send_research_data(self, research_result: Dict[str, Any]) -> bool:
if not self.enabled:
return False
try:
payload = self._format_for_airaa(research_result)
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
self.webhook_url, json=payload, headers=headers
) as response:
success = response.status == 200
if success:
logger.info("Data sent to AIRAA successfully")
else:
logger.warning(f"AIRAA webhook returned {response.status}")
return success
except Exception as e:
logger.error(f"AIRAA integration failed: {e}")
return False
def _format_for_airaa(self, result: Dict[str, Any]) -> Dict[str, Any]:
return {
"source": "web3-research-copilot",
"timestamp": result["metadata"]["timestamp"],
"query": result["query"],
"research_plan": result.get("research_plan", {}),
"findings": result["result"],
"data_sources": result["sources"],
"confidence_score": self._calculate_confidence(result),
"tags": self._extract_tags(result["query"]),
"structured_data": self._extract_structured_data(result["result"])
}
def _calculate_confidence(self, result: Dict[str, Any]) -> float:
base_score = 0.7
source_boost = min(len(result.get("sources", [])) * 0.1, 0.3)
error_penalty = 0.3 if not result.get("success", True) else 0
return max(0.0, min(1.0, base_score + source_boost - error_penalty))
def _extract_tags(self, query: str) -> List[str]:
tags = []
query_lower = query.lower()
token_patterns = {
"bitcoin": ["bitcoin", "btc"],
"ethereum": ["ethereum", "eth"],
"defi": ["defi", "defillama", "protocol", "tvl"],
"market-analysis": ["price", "market", "analysis"],
"trading-volume": ["volume", "trading"]
}
for tag, patterns in token_patterns.items():
if any(pattern in query_lower for pattern in patterns):
tags.append(tag)
return tags
def _extract_structured_data(self, result_text: str) -> Dict[str, Any]:
structured = {}
price_pattern = r'\$([0-9,]+\.?[0-9]*)'
percentage_pattern = r'([+-]?[0-9]+\.?[0-9]*)%'
prices = re.findall(price_pattern, result_text)
percentages = re.findall(percentage_pattern, result_text)
if prices:
structured["prices"] = [float(p.replace(',', '')) for p in prices[:5]]
if percentages:
structured["percentages"] = [float(p) for p in percentages[:5]]
return structured
|