|
import requests |
|
import json |
|
from typing import Dict, List, Any, Optional |
|
from datetime import datetime, timedelta |
|
import time |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, IntegrationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class NewsIntegration: |
|
"""News API integration for current events""" |
|
|
|
def __init__(self, api_key: Optional[str] = None, provider: str = "newsapi"): |
|
"""Initialize News API integration |
|
|
|
Args: |
|
api_key: API key for the news provider (optional) |
|
provider: News data provider (default: newsapi) |
|
""" |
|
self.api_key = api_key |
|
self.provider = provider.lower() |
|
self.cache = {} |
|
self.cache_expiry = {} |
|
|
|
|
|
self.providers = { |
|
"newsapi": { |
|
"top_headlines_url": "https://newsapi.org/v2/top-headlines", |
|
"everything_url": "https://newsapi.org/v2/everything", |
|
"sources_url": "https://newsapi.org/v2/top-headlines/sources", |
|
"cache_duration": 1800 |
|
}, |
|
"gnews": { |
|
"top_headlines_url": "https://gnews.io/api/v4/top-headlines", |
|
"search_url": "https://gnews.io/api/v4/search", |
|
"cache_duration": 1800 |
|
} |
|
} |
|
|
|
@handle_exceptions |
|
def set_api_key(self, api_key: str) -> None: |
|
"""Set API key for the news provider |
|
|
|
Args: |
|
api_key: API key |
|
""" |
|
self.api_key = api_key |
|
|
|
self.cache = {} |
|
self.cache_expiry = {} |
|
|
|
@handle_exceptions |
|
def set_provider(self, provider: str) -> None: |
|
"""Set news data provider |
|
|
|
Args: |
|
provider: News data provider |
|
""" |
|
provider = provider.lower() |
|
if provider not in self.providers: |
|
raise IntegrationError(f"Unsupported news provider: {provider}") |
|
|
|
self.provider = provider |
|
|
|
self.cache = {} |
|
self.cache_expiry = {} |
|
|
|
@handle_exceptions |
|
def test_connection(self) -> bool: |
|
"""Test news API connection |
|
|
|
Returns: |
|
True if connection is successful, False otherwise |
|
""" |
|
if not self.api_key: |
|
logger.error("News API key not set") |
|
return False |
|
|
|
try: |
|
if self.provider == "newsapi": |
|
|
|
params = { |
|
"apiKey": self.api_key, |
|
"language": "en" |
|
} |
|
response = requests.get(self.providers[self.provider]["sources_url"], params=params) |
|
|
|
elif self.provider == "gnews": |
|
|
|
params = { |
|
"token": self.api_key, |
|
"lang": "en", |
|
"max": 1 |
|
} |
|
response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params) |
|
|
|
return response.status_code == 200 |
|
|
|
except Exception as e: |
|
logger.error(f"News API connection test failed: {str(e)}") |
|
return False |
|
|
|
@handle_exceptions |
|
def get_top_headlines(self, country: Optional[str] = None, category: Optional[str] = None, |
|
sources: Optional[str] = None, query: Optional[str] = None, |
|
page_size: int = 20, page: int = 1) -> Dict[str, Any]: |
|
"""Get top headlines |
|
|
|
Args: |
|
country: Country code (optional) |
|
category: News category (optional) |
|
sources: Comma-separated list of sources (optional) |
|
query: Search query (optional) |
|
page_size: Number of results per page (default: 20) |
|
page: Page number (default: 1) |
|
|
|
Returns: |
|
Top headlines data |
|
""" |
|
if not self.api_key: |
|
raise IntegrationError("News API key not set") |
|
|
|
|
|
cache_key = f"headlines_{country}_{category}_{sources}_{query}_{page_size}_{page}_{self.provider}" |
|
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): |
|
return self.cache[cache_key] |
|
|
|
try: |
|
if self.provider == "newsapi": |
|
params = { |
|
"apiKey": self.api_key, |
|
"pageSize": page_size, |
|
"page": page |
|
} |
|
|
|
|
|
if country: |
|
params["country"] = country |
|
if category: |
|
params["category"] = category |
|
if sources: |
|
params["sources"] = sources |
|
if query: |
|
params["q"] = query |
|
|
|
response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get top headlines: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
headlines = self._process_newsapi_headlines(data) |
|
|
|
elif self.provider == "gnews": |
|
params = { |
|
"token": self.api_key, |
|
"max": page_size |
|
} |
|
|
|
|
|
if country: |
|
params["country"] = country |
|
if category: |
|
params["topic"] = category |
|
if query: |
|
params["q"] = query |
|
|
|
response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get top headlines: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
headlines = self._process_gnews_headlines(data) |
|
|
|
else: |
|
raise IntegrationError(f"Unsupported news provider: {self.provider}") |
|
|
|
|
|
self.cache[cache_key] = headlines |
|
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] |
|
|
|
return headlines |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to get top headlines: {str(e)}") |
|
raise IntegrationError(f"Failed to get top headlines: {str(e)}") |
|
raise |
|
|
|
@handle_exceptions |
|
def search_news(self, query: str, from_date: Optional[str] = None, to_date: Optional[str] = None, |
|
language: str = "en", sort_by: str = "publishedAt", |
|
page_size: int = 20, page: int = 1) -> Dict[str, Any]: |
|
"""Search for news articles |
|
|
|
Args: |
|
query: Search query |
|
from_date: Start date (YYYY-MM-DD, optional) |
|
to_date: End date (YYYY-MM-DD, optional) |
|
language: Language code (default: en) |
|
sort_by: Sort order (relevancy, popularity, publishedAt) |
|
page_size: Number of results per page (default: 20) |
|
page: Page number (default: 1) |
|
|
|
Returns: |
|
Search results |
|
""" |
|
if not self.api_key: |
|
raise IntegrationError("News API key not set") |
|
|
|
|
|
cache_key = f"search_{query}_{from_date}_{to_date}_{language}_{sort_by}_{page_size}_{page}_{self.provider}" |
|
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): |
|
return self.cache[cache_key] |
|
|
|
try: |
|
if self.provider == "newsapi": |
|
params = { |
|
"apiKey": self.api_key, |
|
"q": query, |
|
"language": language, |
|
"sortBy": sort_by, |
|
"pageSize": page_size, |
|
"page": page |
|
} |
|
|
|
|
|
if from_date: |
|
params["from"] = from_date |
|
if to_date: |
|
params["to"] = to_date |
|
|
|
response = requests.get(self.providers[self.provider]["everything_url"], params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to search news: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
search_results = self._process_newsapi_headlines(data) |
|
|
|
elif self.provider == "gnews": |
|
params = { |
|
"token": self.api_key, |
|
"q": query, |
|
"lang": language, |
|
"max": page_size |
|
} |
|
|
|
|
|
if from_date: |
|
|
|
try: |
|
from_datetime = datetime.strptime(from_date, "%Y-%m-%d") |
|
params["from"] = from_datetime.isoformat() |
|
except: |
|
pass |
|
|
|
if to_date: |
|
|
|
try: |
|
to_datetime = datetime.strptime(to_date, "%Y-%m-%d") |
|
params["to"] = to_datetime.isoformat() |
|
except: |
|
pass |
|
|
|
response = requests.get(self.providers[self.provider]["search_url"], params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to search news: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
search_results = self._process_gnews_headlines(data) |
|
|
|
else: |
|
raise IntegrationError(f"Unsupported news provider: {self.provider}") |
|
|
|
|
|
self.cache[cache_key] = search_results |
|
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] |
|
|
|
return search_results |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to search news: {str(e)}") |
|
raise IntegrationError(f"Failed to search news: {str(e)}") |
|
raise |
|
|
|
@handle_exceptions |
|
def get_sources(self, category: Optional[str] = None, language: str = "en", |
|
country: Optional[str] = None) -> List[Dict[str, Any]]: |
|
"""Get news sources |
|
|
|
Args: |
|
category: News category (optional) |
|
language: Language code (default: en) |
|
country: Country code (optional) |
|
|
|
Returns: |
|
List of news sources |
|
""" |
|
if not self.api_key: |
|
raise IntegrationError("News API key not set") |
|
|
|
|
|
cache_key = f"sources_{category}_{language}_{country}_{self.provider}" |
|
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): |
|
return self.cache[cache_key] |
|
|
|
try: |
|
if self.provider == "newsapi": |
|
params = { |
|
"apiKey": self.api_key, |
|
"language": language |
|
} |
|
|
|
|
|
if category: |
|
params["category"] = category |
|
if country: |
|
params["country"] = country |
|
|
|
response = requests.get(self.providers[self.provider]["sources_url"], params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get sources: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
if "sources" in data: |
|
sources = data["sources"] |
|
else: |
|
sources = [] |
|
|
|
elif self.provider == "gnews": |
|
|
|
sources = [] |
|
|
|
else: |
|
raise IntegrationError(f"Unsupported news provider: {self.provider}") |
|
|
|
|
|
self.cache[cache_key] = sources |
|
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] |
|
|
|
return sources |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to get sources: {str(e)}") |
|
raise IntegrationError(f"Failed to get sources: {str(e)}") |
|
raise |
|
|
|
@handle_exceptions |
|
def get_categories(self) -> List[str]: |
|
"""Get available news categories |
|
|
|
Returns: |
|
List of news categories |
|
""" |
|
if self.provider == "newsapi": |
|
return ["business", "entertainment", "general", "health", "science", "sports", "technology"] |
|
elif self.provider == "gnews": |
|
return ["general", "world", "nation", "business", "technology", "entertainment", "sports", "science", "health"] |
|
else: |
|
return [] |
|
|
|
def _process_newsapi_headlines(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Process NewsAPI headlines data |
|
|
|
Args: |
|
data: Raw API response data |
|
|
|
Returns: |
|
Processed headlines data |
|
""" |
|
articles = [] |
|
|
|
for article in data.get("articles", []): |
|
|
|
processed_article = { |
|
"title": article.get("title", ""), |
|
"description": article.get("description", ""), |
|
"content": article.get("content", ""), |
|
"url": article.get("url", ""), |
|
"image_url": article.get("urlToImage", ""), |
|
"published_at": article.get("publishedAt", ""), |
|
"source": { |
|
"id": article.get("source", {}).get("id", ""), |
|
"name": article.get("source", {}).get("name", "") |
|
}, |
|
"author": article.get("author", "") |
|
} |
|
|
|
articles.append(processed_article) |
|
|
|
return { |
|
"status": data.get("status", ""), |
|
"total_results": data.get("totalResults", 0), |
|
"articles": articles |
|
} |
|
|
|
def _process_gnews_headlines(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Process GNews headlines data |
|
|
|
Args: |
|
data: Raw API response data |
|
|
|
Returns: |
|
Processed headlines data |
|
""" |
|
articles = [] |
|
|
|
for article in data.get("articles", []): |
|
|
|
processed_article = { |
|
"title": article.get("title", ""), |
|
"description": article.get("description", ""), |
|
"content": article.get("content", ""), |
|
"url": article.get("url", ""), |
|
"image_url": article.get("image", ""), |
|
"published_at": article.get("publishedAt", ""), |
|
"source": { |
|
"id": "", |
|
"name": article.get("source", {}).get("name", "") |
|
}, |
|
"author": "" |
|
} |
|
|
|
articles.append(processed_article) |
|
|
|
return { |
|
"status": "ok" if "articles" in data else "error", |
|
"total_results": len(data.get("articles", [])), |
|
"articles": articles |
|
} |