Spaces:
Running
Running
import aiohttp | |
import asyncio | |
from typing import Dict, Any, List, Optional | |
from datetime import datetime | |
import json | |
class CryptoNewsAggregator: | |
def __init__(self): | |
self.sources = { | |
"cryptonews": "https://cryptonews-api.com/api/v1/category?section=general&items=10", | |
"newsapi": "https://newsapi.org/v2/everything?q=cryptocurrency&sortBy=publishedAt&pageSize=10", | |
"coindesk": "https://api.coindesk.com/v1/news/articles" | |
} | |
self.session = None | |
async def get_session(self): | |
if self.session is None: | |
timeout = aiohttp.ClientTimeout(total=30) | |
headers = {"User-Agent": "Web3-Research-CoBot/1.0"} | |
self.session = aiohttp.ClientSession(timeout=timeout, headers=headers) | |
return self.session | |
async def get_crypto_news(self, limit: int = 10) -> List[Dict[str, Any]]: | |
news_items = [] | |
tasks = [] | |
for source, url in self.sources.items(): | |
tasks.append(self._fetch_news_from_source(source, url)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for result in results: | |
if not isinstance(result, Exception) and result: | |
news_items.extend(result[:5]) | |
news_items.sort(key=lambda x: x.get("timestamp", 0), reverse=True) | |
return news_items[:limit] | |
async def _fetch_news_from_source(self, source: str, url: str) -> List[Dict[str, Any]]: | |
try: | |
session = await self.get_session() | |
async with session.get(url) as response: | |
if response.status == 200: | |
data = await response.json() | |
return self._parse_news_data(source, data) | |
return [] | |
except Exception: | |
return [] | |
def _parse_news_data(self, source: str, data: Dict[str, Any]) -> List[Dict[str, Any]]: | |
news_items = [] | |
current_time = datetime.now().timestamp() | |
try: | |
if source == "cryptonews" and "data" in data: | |
for item in data["data"][:5]: | |
news_items.append({ | |
"title": item.get("news_title", ""), | |
"summary": item.get("text", "")[:200] + "...", | |
"url": item.get("news_url", ""), | |
"source": "CryptoNews", | |
"timestamp": current_time | |
}) | |
elif source == "newsapi" and "articles" in data: | |
for item in data["articles"][:5]: | |
news_items.append({ | |
"title": item.get("title", ""), | |
"summary": item.get("description", "")[:200] + "...", | |
"url": item.get("url", ""), | |
"source": item.get("source", {}).get("name", "NewsAPI"), | |
"timestamp": current_time | |
}) | |
except Exception: | |
pass | |
return news_items | |
async def close(self): | |
if self.session: | |
await self.session.close() | |
news_aggregator = CryptoNewsAggregator() |