mrradix's picture
Upload 48 files
8e4018d verified
import requests
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import time
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, IntegrationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class NewsIntegration:
"""News API integration for current events"""
def __init__(self, api_key: Optional[str] = None, provider: str = "newsapi"):
"""Initialize News API integration
Args:
api_key: API key for the news provider (optional)
provider: News data provider (default: newsapi)
"""
self.api_key = api_key
self.provider = provider.lower()
self.cache = {}
self.cache_expiry = {}
# Set up provider-specific configurations
self.providers = {
"newsapi": {
"top_headlines_url": "https://newsapi.org/v2/top-headlines",
"everything_url": "https://newsapi.org/v2/everything",
"sources_url": "https://newsapi.org/v2/top-headlines/sources",
"cache_duration": 1800 # 30 minutes
},
"gnews": {
"top_headlines_url": "https://gnews.io/api/v4/top-headlines",
"search_url": "https://gnews.io/api/v4/search",
"cache_duration": 1800 # 30 minutes
}
}
@handle_exceptions
def set_api_key(self, api_key: str) -> None:
"""Set API key for the news provider
Args:
api_key: API key
"""
self.api_key = api_key
# Clear cache when API key changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def set_provider(self, provider: str) -> None:
"""Set news data provider
Args:
provider: News data provider
"""
provider = provider.lower()
if provider not in self.providers:
raise IntegrationError(f"Unsupported news provider: {provider}")
self.provider = provider
# Clear cache when provider changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def test_connection(self) -> bool:
"""Test news API connection
Returns:
True if connection is successful, False otherwise
"""
if not self.api_key:
logger.error("News API key not set")
return False
try:
if self.provider == "newsapi":
# Test with a simple sources request
params = {
"apiKey": self.api_key,
"language": "en"
}
response = requests.get(self.providers[self.provider]["sources_url"], params=params)
elif self.provider == "gnews":
# Test with a simple top headlines request
params = {
"token": self.api_key,
"lang": "en",
"max": 1
}
response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params)
return response.status_code == 200
except Exception as e:
logger.error(f"News API connection test failed: {str(e)}")
return False
@handle_exceptions
def get_top_headlines(self, country: Optional[str] = None, category: Optional[str] = None,
sources: Optional[str] = None, query: Optional[str] = None,
page_size: int = 20, page: int = 1) -> Dict[str, Any]:
"""Get top headlines
Args:
country: Country code (optional)
category: News category (optional)
sources: Comma-separated list of sources (optional)
query: Search query (optional)
page_size: Number of results per page (default: 20)
page: Page number (default: 1)
Returns:
Top headlines data
"""
if not self.api_key:
raise IntegrationError("News API key not set")
# Check cache
cache_key = f"headlines_{country}_{category}_{sources}_{query}_{page_size}_{page}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "newsapi":
params = {
"apiKey": self.api_key,
"pageSize": page_size,
"page": page
}
# Add optional parameters
if country:
params["country"] = country
if category:
params["category"] = category
if sources:
params["sources"] = sources
if query:
params["q"] = query
response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get top headlines: {response.text}")
data = response.json()
# Process data into a standardized format
headlines = self._process_newsapi_headlines(data)
elif self.provider == "gnews":
params = {
"token": self.api_key,
"max": page_size
}
# Add optional parameters
if country:
params["country"] = country
if category:
params["topic"] = category
if query:
params["q"] = query
response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get top headlines: {response.text}")
data = response.json()
# Process data into a standardized format
headlines = self._process_gnews_headlines(data)
else:
raise IntegrationError(f"Unsupported news provider: {self.provider}")
# Cache the result
self.cache[cache_key] = headlines
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return headlines
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get top headlines: {str(e)}")
raise IntegrationError(f"Failed to get top headlines: {str(e)}")
raise
@handle_exceptions
def search_news(self, query: str, from_date: Optional[str] = None, to_date: Optional[str] = None,
language: str = "en", sort_by: str = "publishedAt",
page_size: int = 20, page: int = 1) -> Dict[str, Any]:
"""Search for news articles
Args:
query: Search query
from_date: Start date (YYYY-MM-DD, optional)
to_date: End date (YYYY-MM-DD, optional)
language: Language code (default: en)
sort_by: Sort order (relevancy, popularity, publishedAt)
page_size: Number of results per page (default: 20)
page: Page number (default: 1)
Returns:
Search results
"""
if not self.api_key:
raise IntegrationError("News API key not set")
# Check cache
cache_key = f"search_{query}_{from_date}_{to_date}_{language}_{sort_by}_{page_size}_{page}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "newsapi":
params = {
"apiKey": self.api_key,
"q": query,
"language": language,
"sortBy": sort_by,
"pageSize": page_size,
"page": page
}
# Add optional parameters
if from_date:
params["from"] = from_date
if to_date:
params["to"] = to_date
response = requests.get(self.providers[self.provider]["everything_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to search news: {response.text}")
data = response.json()
# Process data into a standardized format
search_results = self._process_newsapi_headlines(data)
elif self.provider == "gnews":
params = {
"token": self.api_key,
"q": query,
"lang": language,
"max": page_size
}
# Add optional parameters
if from_date:
# Convert YYYY-MM-DD to ISO format
try:
from_datetime = datetime.strptime(from_date, "%Y-%m-%d")
params["from"] = from_datetime.isoformat()
except:
pass
if to_date:
# Convert YYYY-MM-DD to ISO format
try:
to_datetime = datetime.strptime(to_date, "%Y-%m-%d")
params["to"] = to_datetime.isoformat()
except:
pass
response = requests.get(self.providers[self.provider]["search_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to search news: {response.text}")
data = response.json()
# Process data into a standardized format
search_results = self._process_gnews_headlines(data)
else:
raise IntegrationError(f"Unsupported news provider: {self.provider}")
# Cache the result
self.cache[cache_key] = search_results
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return search_results
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to search news: {str(e)}")
raise IntegrationError(f"Failed to search news: {str(e)}")
raise
@handle_exceptions
def get_sources(self, category: Optional[str] = None, language: str = "en",
country: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get news sources
Args:
category: News category (optional)
language: Language code (default: en)
country: Country code (optional)
Returns:
List of news sources
"""
if not self.api_key:
raise IntegrationError("News API key not set")
# Check cache
cache_key = f"sources_{category}_{language}_{country}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "newsapi":
params = {
"apiKey": self.api_key,
"language": language
}
# Add optional parameters
if category:
params["category"] = category
if country:
params["country"] = country
response = requests.get(self.providers[self.provider]["sources_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get sources: {response.text}")
data = response.json()
# Process data
if "sources" in data:
sources = data["sources"]
else:
sources = []
elif self.provider == "gnews":
# GNews doesn't have a sources endpoint, so we'll return a placeholder
sources = []
else:
raise IntegrationError(f"Unsupported news provider: {self.provider}")
# Cache the result
self.cache[cache_key] = sources
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return sources
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get sources: {str(e)}")
raise IntegrationError(f"Failed to get sources: {str(e)}")
raise
@handle_exceptions
def get_categories(self) -> List[str]:
"""Get available news categories
Returns:
List of news categories
"""
if self.provider == "newsapi":
return ["business", "entertainment", "general", "health", "science", "sports", "technology"]
elif self.provider == "gnews":
return ["general", "world", "nation", "business", "technology", "entertainment", "sports", "science", "health"]
else:
return []
def _process_newsapi_headlines(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process NewsAPI headlines data
Args:
data: Raw API response data
Returns:
Processed headlines data
"""
articles = []
for article in data.get("articles", []):
# Process article data
processed_article = {
"title": article.get("title", ""),
"description": article.get("description", ""),
"content": article.get("content", ""),
"url": article.get("url", ""),
"image_url": article.get("urlToImage", ""),
"published_at": article.get("publishedAt", ""),
"source": {
"id": article.get("source", {}).get("id", ""),
"name": article.get("source", {}).get("name", "")
},
"author": article.get("author", "")
}
articles.append(processed_article)
return {
"status": data.get("status", ""),
"total_results": data.get("totalResults", 0),
"articles": articles
}
def _process_gnews_headlines(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process GNews headlines data
Args:
data: Raw API response data
Returns:
Processed headlines data
"""
articles = []
for article in data.get("articles", []):
# Process article data
processed_article = {
"title": article.get("title", ""),
"description": article.get("description", ""),
"content": article.get("content", ""),
"url": article.get("url", ""),
"image_url": article.get("image", ""),
"published_at": article.get("publishedAt", ""),
"source": {
"id": "", # GNews doesn't provide source ID
"name": article.get("source", {}).get("name", "")
},
"author": "" # GNews doesn't provide author information
}
articles.append(processed_article)
return {
"status": "ok" if "articles" in data else "error",
"total_results": len(data.get("articles", [])),
"articles": articles
}