Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
import feedparser | |
import trafilatura | |
from urllib.parse import urljoin, urlparse | |
import time | |
import logging | |
from datetime import datetime, timedelta | |
from typing import List, Dict, Optional, Set | |
import hashlib | |
import re | |
from langdetect import detect | |
import random | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
logger = logging.getLogger(__name__) | |
class NewsletterScraper: | |
"""Robust news scraper with multiple sources and deduplication""" | |
def __init__(self): | |
self.session = self._create_session() | |
self.scraped_urls: Set[str] = set() | |
self.content_hashes: Set[str] = set() | |
# News sources configuration | |
self.rss_sources = { | |
'google_news': 'https://news.google.com/rss/search?q={}&hl=en&gl=US&ceid=US:en', | |
'yahoo_finance': 'https://feeds.finance.yahoo.com/rss/2.0/headline', | |
'reuters_business': 'https://www.reutersagency.com/feed/?best-topics=business-finance&post_type=best', | |
'bbc_business': 'http://feeds.bbci.co.uk/news/business/rss.xml', | |
'cnbc': 'https://www.cnbc.com/id/100003114/device/rss/rss.html', | |
'marketwatch': 'http://feeds.marketwatch.com/marketwatch/topstories/', | |
'financial_times': 'https://www.ft.com/rss/home', | |
'bloomberg': 'https://feeds.bloomberg.com/politics/news.rss' | |
} | |
self.user_agents = [ | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0' | |
] | |
logger.info("NewsletterScraper initialized") | |
def _create_session(self) -> requests.Session: | |
"""Create a session with retry strategy""" | |
session = requests.Session() | |
# Retry strategy | |
retry_strategy = Retry( | |
total=3, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
session.mount("http://", adapter) | |
session.mount("https://", adapter) | |
return session | |
def _get_random_headers(self) -> Dict[str, str]: | |
"""Get randomized headers to avoid blocking""" | |
return { | |
'User-Agent': random.choice(self.user_agents), | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
} | |
def scrape_news(self, query: str, max_articles: int = 20) -> List[Dict]: | |
"""Main scraping function""" | |
logger.info(f"Starting news scraping for query: {query}") | |
all_articles = [] | |
self.scraped_urls.clear() | |
self.content_hashes.clear() | |
try: | |
# Primary: Google News RSS | |
google_articles = self._scrape_google_news(query, max_articles // 2) | |
all_articles.extend(google_articles) | |
# Secondary: Other RSS sources | |
for source_name, rss_url in list(self.rss_sources.items())[1:4]: # Limit to avoid timeouts | |
if len(all_articles) >= max_articles: | |
break | |
try: | |
source_articles = self._scrape_rss_source(rss_url, query, 5) | |
all_articles.extend(source_articles) | |
except Exception as e: | |
logger.warning(f"Failed to scrape {source_name}: {str(e)}") | |
continue | |
# Deduplicate and filter | |
articles = self._deduplicate_articles(all_articles) | |
articles = self._filter_articles(articles, query) | |
articles = articles[:max_articles] | |
# Extract full content | |
for article in articles: | |
try: | |
full_content = self._extract_full_content(article['url']) | |
if full_content and len(full_content) > 200: | |
article['content'] = full_content | |
else: | |
article['content'] = article.get('summary', article.get('title', '')) | |
except Exception as e: | |
logger.warning(f"Failed to extract content from {article['url']}: {str(e)}") | |
article['content'] = article.get('summary', article.get('title', '')) | |
# Filter by language (English only) | |
articles = [article for article in articles if self._is_english(article['content'])] | |
logger.info(f"Successfully scraped {len(articles)} articles") | |
return articles | |
except Exception as e: | |
logger.error(f"Error in scrape_news: {str(e)}") | |
return [] | |
def _scrape_google_news(self, query: str, max_articles: int) -> List[Dict]: | |
"""Scrape Google News RSS""" | |
try: | |
url = self.rss_sources['google_news'].format(query.replace(' ', '%20')) | |
headers = self._get_random_headers() | |
response = self.session.get(url, headers=headers, timeout=10) | |
if response.status_code != 200: | |
logger.warning(f"Google News RSS returned status {response.status_code}") | |
return [] | |
feed = feedparser.parse(response.content) | |
articles = [] | |
for entry in feed.entries[:max_articles * 2]: # Get extra for filtering | |
try: | |
article = { | |
'title': entry.title, | |
'url': entry.link, | |
'summary': entry.get('summary', ''), | |
'date': self._parse_date(entry.get('published', '')), | |
'source': 'Google News' | |
} | |
# Skip if already seen | |
if article['url'] in self.scraped_urls: | |
continue | |
self.scraped_urls.add(article['url']) | |
articles.append(article) | |
except Exception as e: | |
logger.warning(f"Error parsing Google News entry: {str(e)}") | |
continue | |
return articles | |
except Exception as e: | |
logger.error(f"Error scraping Google News: {str(e)}") | |
return [] | |
def _scrape_rss_source(self, rss_url: str, query: str, max_articles: int) -> List[Dict]: | |
"""Scrape a generic RSS source""" | |
try: | |
headers = self._get_random_headers() | |
response = self.session.get(rss_url, headers=headers, timeout=10) | |
if response.status_code != 200: | |
return [] | |
feed = feedparser.parse(response.content) | |
articles = [] | |
query_lower = query.lower() | |
for entry in feed.entries[:max_articles * 3]: # Get extra for filtering | |
try: | |
title = entry.get('title', '') | |
summary = entry.get('summary', '') | |
# Check if article is relevant to query | |
if not (query_lower in title.lower() or query_lower in summary.lower()): | |
continue | |
article = { | |
'title': title, | |
'url': entry.get('link', ''), | |
'summary': summary, | |
'date': self._parse_date(entry.get('published', '')), | |
'source': self._extract_source_name(rss_url) | |
} | |
# Skip if already seen | |
if article['url'] in self.scraped_urls: | |
continue | |
self.scraped_urls.add(article['url']) | |
articles.append(article) | |
if len(articles) >= max_articles: | |
break | |
except Exception as e: | |
logger.warning(f"Error parsing RSS entry: {str(e)}") | |
continue | |
# Small delay to be respectful | |
time.sleep(0.5) | |
return articles | |
except Exception as e: | |
logger.error(f"Error scraping RSS {rss_url}: {str(e)}") | |
return [] | |
def _extract_full_content(self, url: str) -> Optional[str]: | |
"""Extract full article content using trafilatura""" | |
try: | |
headers = self._get_random_headers() | |
# Download the page | |
downloaded = trafilatura.fetch_url(url, headers=headers) | |
if not downloaded: | |
return None | |
# Extract text content | |
text = trafilatura.extract( | |
downloaded, | |
include_comments=False, | |
include_tables=False, | |
include_formatting=False, | |
no_fallback=False | |
) | |
if text and len(text.strip()) > 100: | |
return text.strip() | |
return None | |
except Exception as e: | |
logger.warning(f"Error extracting content from {url}: {str(e)}") | |
return None | |
def _deduplicate_articles(self, articles: List[Dict]) -> List[Dict]: | |
"""Remove duplicate articles based on content similarity""" | |
unique_articles = [] | |
for article in articles: | |
# Create content hash | |
content_for_hash = f"{article['title']} {article.get('summary', '')}" | |
content_hash = hashlib.md5(content_for_hash.encode()).hexdigest() | |
if content_hash not in self.content_hashes: | |
self.content_hashes.add(content_hash) | |
unique_articles.append(article) | |
logger.info(f"Deduplicated {len(articles)} -> {len(unique_articles)} articles") | |
return unique_articles | |
def _filter_articles(self, articles: List[Dict], query: str) -> List[Dict]: | |
"""Filter articles for relevance and quality""" | |
filtered_articles = [] | |
query_lower = query.lower() | |
for article in articles: | |
# Check minimum content length | |
title_summary = f"{article['title']} {article.get('summary', '')}" | |
if len(title_summary.strip()) < 50: | |
continue | |
# Check relevance (more flexible than RSS filtering) | |
if (query_lower in article['title'].lower() or | |
query_lower in article.get('summary', '').lower() or | |
any(word in article['title'].lower() for word in query_lower.split())): | |
filtered_articles.append(article) | |
logger.info(f"Filtered {len(articles)} -> {len(filtered_articles)} articles for relevance") | |
return filtered_articles | |
def _is_english(self, text: str) -> bool: | |
"""Check if text is in English using language detection""" | |
try: | |
if len(text.strip()) < 20: | |
return True # Assume short text is English | |
detected_lang = detect(text[:1000]) # Check first 1000 chars | |
return detected_lang == 'en' | |
except Exception: | |
# If detection fails, assume English | |
return True | |
def _parse_date(self, date_str: str) -> Optional[datetime]: | |
"""Parse date from RSS feed""" | |
if not date_str: | |
return datetime.now() | |
try: | |
# Try common RSS date formats | |
for fmt in ['%a, %d %b %Y %H:%M:%S %Z', | |
'%Y-%m-%dT%H:%M:%SZ', | |
'%Y-%m-%d %H:%M:%S']: | |
try: | |
return datetime.strptime(date_str.strip(), fmt) | |
except ValueError: | |
continue | |
# If all fails, return current time | |
return datetime.now() | |
except Exception: | |
return datetime.now() | |
def _extract_source_name(self, url: str) -> str: | |
"""Extract source name from URL""" | |
try: | |
domain = urlparse(url).netloc | |
# Clean up common domain patterns | |
domain = domain.replace('www.', '').replace('feeds.', '') | |
# Map known domains to clean names | |
domain_mapping = { | |
'news.google.com': 'Google News', | |
'finance.yahoo.com': 'Yahoo Finance', | |
'reuters.com': 'Reuters', | |
'reutersagency.com': 'Reuters', | |
'bbc.co.uk': 'BBC', | |
'cnbc.com': 'CNBC', | |
'marketwatch.com': 'MarketWatch', | |
'ft.com': 'Financial Times', | |
'bloomberg.com': 'Bloomberg' | |
} | |
return domain_mapping.get(domain, domain.title()) | |
except Exception: | |
return 'Unknown' | |
def get_available_sources(self) -> List[str]: | |
"""Get list of available news sources""" | |
return list(self.rss_sources.keys()) | |
# Additional utility functions for scraping | |
def clean_html(html_content: str) -> str: | |
"""Clean HTML content and extract text""" | |
try: | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.extract() | |
# Get text | |
text = soup.get_text() | |
# Clean up whitespace | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
return text | |
except Exception as e: | |
logger.error(f"Error cleaning HTML: {str(e)}") | |
return "" | |
def is_valid_article_url(url: str) -> bool: | |
"""Check if URL is likely to be a valid article URL""" | |
try: | |
parsed = urlparse(url) | |
# Skip certain file types | |
skip_extensions = ['.pdf', '.jpg', '.png', '.gif', '.mp4', '.mp3'] | |
if any(url.lower().endswith(ext) for ext in skip_extensions): | |
return False | |
# Skip obvious non-article URLs | |
skip_patterns = ['login', 'register', 'subscribe', 'newsletter', 'sitemap'] | |
if any(pattern in url.lower() for pattern in skip_patterns): | |
return False | |
return True | |
except Exception: | |
return False |