import requests import json from typing import Dict, List, Any, Optional from datetime import datetime, timedelta import time from utils.logging import setup_logger from utils.error_handling import handle_exceptions, IntegrationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class NewsIntegration: """News API integration for current events""" def __init__(self, api_key: Optional[str] = None, provider: str = "newsapi"): """Initialize News API integration Args: api_key: API key for the news provider (optional) provider: News data provider (default: newsapi) """ self.api_key = api_key self.provider = provider.lower() self.cache = {} self.cache_expiry = {} # Set up provider-specific configurations self.providers = { "newsapi": { "top_headlines_url": "https://newsapi.org/v2/top-headlines", "everything_url": "https://newsapi.org/v2/everything", "sources_url": "https://newsapi.org/v2/top-headlines/sources", "cache_duration": 1800 # 30 minutes }, "gnews": { "top_headlines_url": "https://gnews.io/api/v4/top-headlines", "search_url": "https://gnews.io/api/v4/search", "cache_duration": 1800 # 30 minutes } } @handle_exceptions def set_api_key(self, api_key: str) -> None: """Set API key for the news provider Args: api_key: API key """ self.api_key = api_key # Clear cache when API key changes self.cache = {} self.cache_expiry = {} @handle_exceptions def set_provider(self, provider: str) -> None: """Set news data provider Args: provider: News data provider """ provider = provider.lower() if provider not in self.providers: raise IntegrationError(f"Unsupported news provider: {provider}") self.provider = provider # Clear cache when provider changes self.cache = {} self.cache_expiry = {} @handle_exceptions def test_connection(self) -> bool: """Test news API connection Returns: True if connection is successful, False otherwise """ if not self.api_key: logger.error("News API key not set") return False try: if self.provider == "newsapi": # Test with a simple sources request params = { "apiKey": self.api_key, "language": "en" } response = requests.get(self.providers[self.provider]["sources_url"], params=params) elif self.provider == "gnews": # Test with a simple top headlines request params = { "token": self.api_key, "lang": "en", "max": 1 } response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params) return response.status_code == 200 except Exception as e: logger.error(f"News API connection test failed: {str(e)}") return False @handle_exceptions def get_top_headlines(self, country: Optional[str] = None, category: Optional[str] = None, sources: Optional[str] = None, query: Optional[str] = None, page_size: int = 20, page: int = 1) -> Dict[str, Any]: """Get top headlines Args: country: Country code (optional) category: News category (optional) sources: Comma-separated list of sources (optional) query: Search query (optional) page_size: Number of results per page (default: 20) page: Page number (default: 1) Returns: Top headlines data """ if not self.api_key: raise IntegrationError("News API key not set") # Check cache cache_key = f"headlines_{country}_{category}_{sources}_{query}_{page_size}_{page}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "newsapi": params = { "apiKey": self.api_key, "pageSize": page_size, "page": page } # Add optional parameters if country: params["country"] = country if category: params["category"] = category if sources: params["sources"] = sources if query: params["q"] = query response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get top headlines: {response.text}") data = response.json() # Process data into a standardized format headlines = self._process_newsapi_headlines(data) elif self.provider == "gnews": params = { "token": self.api_key, "max": page_size } # Add optional parameters if country: params["country"] = country if category: params["topic"] = category if query: params["q"] = query response = requests.get(self.providers[self.provider]["top_headlines_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get top headlines: {response.text}") data = response.json() # Process data into a standardized format headlines = self._process_gnews_headlines(data) else: raise IntegrationError(f"Unsupported news provider: {self.provider}") # Cache the result self.cache[cache_key] = headlines self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return headlines except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get top headlines: {str(e)}") raise IntegrationError(f"Failed to get top headlines: {str(e)}") raise @handle_exceptions def search_news(self, query: str, from_date: Optional[str] = None, to_date: Optional[str] = None, language: str = "en", sort_by: str = "publishedAt", page_size: int = 20, page: int = 1) -> Dict[str, Any]: """Search for news articles Args: query: Search query from_date: Start date (YYYY-MM-DD, optional) to_date: End date (YYYY-MM-DD, optional) language: Language code (default: en) sort_by: Sort order (relevancy, popularity, publishedAt) page_size: Number of results per page (default: 20) page: Page number (default: 1) Returns: Search results """ if not self.api_key: raise IntegrationError("News API key not set") # Check cache cache_key = f"search_{query}_{from_date}_{to_date}_{language}_{sort_by}_{page_size}_{page}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "newsapi": params = { "apiKey": self.api_key, "q": query, "language": language, "sortBy": sort_by, "pageSize": page_size, "page": page } # Add optional parameters if from_date: params["from"] = from_date if to_date: params["to"] = to_date response = requests.get(self.providers[self.provider]["everything_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to search news: {response.text}") data = response.json() # Process data into a standardized format search_results = self._process_newsapi_headlines(data) elif self.provider == "gnews": params = { "token": self.api_key, "q": query, "lang": language, "max": page_size } # Add optional parameters if from_date: # Convert YYYY-MM-DD to ISO format try: from_datetime = datetime.strptime(from_date, "%Y-%m-%d") params["from"] = from_datetime.isoformat() except: pass if to_date: # Convert YYYY-MM-DD to ISO format try: to_datetime = datetime.strptime(to_date, "%Y-%m-%d") params["to"] = to_datetime.isoformat() except: pass response = requests.get(self.providers[self.provider]["search_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to search news: {response.text}") data = response.json() # Process data into a standardized format search_results = self._process_gnews_headlines(data) else: raise IntegrationError(f"Unsupported news provider: {self.provider}") # Cache the result self.cache[cache_key] = search_results self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return search_results except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to search news: {str(e)}") raise IntegrationError(f"Failed to search news: {str(e)}") raise @handle_exceptions def get_sources(self, category: Optional[str] = None, language: str = "en", country: Optional[str] = None) -> List[Dict[str, Any]]: """Get news sources Args: category: News category (optional) language: Language code (default: en) country: Country code (optional) Returns: List of news sources """ if not self.api_key: raise IntegrationError("News API key not set") # Check cache cache_key = f"sources_{category}_{language}_{country}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "newsapi": params = { "apiKey": self.api_key, "language": language } # Add optional parameters if category: params["category"] = category if country: params["country"] = country response = requests.get(self.providers[self.provider]["sources_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get sources: {response.text}") data = response.json() # Process data if "sources" in data: sources = data["sources"] else: sources = [] elif self.provider == "gnews": # GNews doesn't have a sources endpoint, so we'll return a placeholder sources = [] else: raise IntegrationError(f"Unsupported news provider: {self.provider}") # Cache the result self.cache[cache_key] = sources self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return sources except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get sources: {str(e)}") raise IntegrationError(f"Failed to get sources: {str(e)}") raise @handle_exceptions def get_categories(self) -> List[str]: """Get available news categories Returns: List of news categories """ if self.provider == "newsapi": return ["business", "entertainment", "general", "health", "science", "sports", "technology"] elif self.provider == "gnews": return ["general", "world", "nation", "business", "technology", "entertainment", "sports", "science", "health"] else: return [] def _process_newsapi_headlines(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process NewsAPI headlines data Args: data: Raw API response data Returns: Processed headlines data """ articles = [] for article in data.get("articles", []): # Process article data processed_article = { "title": article.get("title", ""), "description": article.get("description", ""), "content": article.get("content", ""), "url": article.get("url", ""), "image_url": article.get("urlToImage", ""), "published_at": article.get("publishedAt", ""), "source": { "id": article.get("source", {}).get("id", ""), "name": article.get("source", {}).get("name", "") }, "author": article.get("author", "") } articles.append(processed_article) return { "status": data.get("status", ""), "total_results": data.get("totalResults", 0), "articles": articles } def _process_gnews_headlines(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process GNews headlines data Args: data: Raw API response data Returns: Processed headlines data """ articles = [] for article in data.get("articles", []): # Process article data processed_article = { "title": article.get("title", ""), "description": article.get("description", ""), "content": article.get("content", ""), "url": article.get("url", ""), "image_url": article.get("image", ""), "published_at": article.get("publishedAt", ""), "source": { "id": "", # GNews doesn't provide source ID "name": article.get("source", {}).get("name", "") }, "author": "" # GNews doesn't provide author information } articles.append(processed_article) return { "status": "ok" if "articles" in data else "error", "total_results": len(data.get("articles", [])), "articles": articles }