web3-copilot / src /news_aggregator.py
“Transcendental-Programmer”
feat: Initial commit with project structure and initial files
20eee66
raw
history blame
3.21 kB
import aiohttp
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
import json
class CryptoNewsAggregator:
def __init__(self):
self.sources = {
"cryptonews": "https://cryptonews-api.com/api/v1/category?section=general&items=10",
"newsapi": "https://newsapi.org/v2/everything?q=cryptocurrency&sortBy=publishedAt&pageSize=10",
"coindesk": "https://api.coindesk.com/v1/news/articles"
}
self.session = None
async def get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=30)
headers = {"User-Agent": "Web3-Research-CoBot/1.0"}
self.session = aiohttp.ClientSession(timeout=timeout, headers=headers)
return self.session
async def get_crypto_news(self, limit: int = 10) -> List[Dict[str, Any]]:
news_items = []
tasks = []
for source, url in self.sources.items():
tasks.append(self._fetch_news_from_source(source, url))
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if not isinstance(result, Exception) and result:
news_items.extend(result[:5])
news_items.sort(key=lambda x: x.get("timestamp", 0), reverse=True)
return news_items[:limit]
async def _fetch_news_from_source(self, source: str, url: str) -> List[Dict[str, Any]]:
try:
session = await self.get_session()
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return self._parse_news_data(source, data)
return []
except Exception:
return []
def _parse_news_data(self, source: str, data: Dict[str, Any]) -> List[Dict[str, Any]]:
news_items = []
current_time = datetime.now().timestamp()
try:
if source == "cryptonews" and "data" in data:
for item in data["data"][:5]:
news_items.append({
"title": item.get("news_title", ""),
"summary": item.get("text", "")[:200] + "...",
"url": item.get("news_url", ""),
"source": "CryptoNews",
"timestamp": current_time
})
elif source == "newsapi" and "articles" in data:
for item in data["articles"][:5]:
news_items.append({
"title": item.get("title", ""),
"summary": item.get("description", "")[:200] + "...",
"url": item.get("url", ""),
"source": item.get("source", {}).get("name", "NewsAPI"),
"timestamp": current_time
})
except Exception:
pass
return news_items
async def close(self):
if self.session:
await self.session.close()
news_aggregator = CryptoNewsAggregator()