|
import requests |
|
from bs4 import BeautifulSoup |
|
import feedparser |
|
import trafilatura |
|
from urllib.parse import urlparse |
|
import time |
|
import logging |
|
from datetime import datetime |
|
from typing import List, Dict, Optional, Set |
|
import hashlib |
|
import re |
|
from langdetect import detect |
|
import random |
|
from requests.adapters import HTTPAdapter |
|
from urllib3.util.retry import Retry |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class NewsletterScraper: |
|
"""Robust news scraper with multiple sources, redirect resolution, and deduplication""" |
|
|
|
def __init__(self): |
|
self.session = self._create_session() |
|
self.scraped_urls: Set[str] = set() |
|
self.content_hashes: Set[str] = set() |
|
|
|
|
|
self.rss_sources = { |
|
'google_news': 'https://news.google.com/rss/search?q={}&hl=en&gl=US&ceid=US:en', |
|
'yahoo_finance': 'https://feeds.finance.yahoo.com/rss/2.0/headline', |
|
'reuters_business': 'https://www.reutersagency.com/feed/?best-topics=business-finance&post_type=best', |
|
'bbc_business': 'http://feeds.bbci.co.uk/news/business/rss.xml', |
|
'cnbc': 'https://www.cnbc.com/id/100003114/device/rss/rss.html', |
|
'marketwatch': 'http://feeds.marketwatch.com/marketwatch/topstories/', |
|
'financial_times': 'https://www.ft.com/rss/home', |
|
'bloomberg': 'https://feeds.bloomberg.com/politics/news.rss' |
|
} |
|
|
|
self.user_agents = [ |
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36', |
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15', |
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36', |
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0', |
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 12.6; rv:121.0) Gecko/20100101 Firefox/121.0' |
|
] |
|
|
|
logger.info("NewsletterScraper initialized") |
|
|
|
|
|
|
|
|
|
def _create_session(self) -> requests.Session: |
|
"""Create a session with retry strategy and default timeouts.""" |
|
session = requests.Session() |
|
|
|
retry_strategy = Retry( |
|
total=3, |
|
backoff_factor=1, |
|
status_forcelist=[429, 500, 502, 503, 504], |
|
allowed_methods=["GET", "HEAD"], |
|
raise_on_status=False, |
|
) |
|
|
|
adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=20, pool_maxsize=20) |
|
session.mount("http://", adapter) |
|
session.mount("https://", adapter) |
|
|
|
|
|
session.request = self._with_timeout(session.request, timeout=10) |
|
return session |
|
|
|
@staticmethod |
|
def _with_timeout(func, timeout: int = 10): |
|
"""Wrap session.request to always include a timeout unless explicitly provided.""" |
|
def wrapper(method, url, **kwargs): |
|
if "timeout" not in kwargs: |
|
kwargs["timeout"] = timeout |
|
return func(method, url, **kwargs) |
|
return wrapper |
|
|
|
def _get_random_headers(self) -> Dict[str, str]: |
|
"""Get randomized headers to avoid blocking.""" |
|
return { |
|
'User-Agent': random.choice(self.user_agents), |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.9', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'Connection': 'keep-alive', |
|
'Upgrade-Insecure-Requests': '1', |
|
} |
|
|
|
|
|
|
|
|
|
def scrape_news(self, query: str, max_articles: int = 20) -> List[Dict]: |
|
"""Main scraping function.""" |
|
logger.info(f"Starting news scraping for query: {query}") |
|
all_articles: List[Dict] = [] |
|
self.scraped_urls.clear() |
|
self.content_hashes.clear() |
|
|
|
try: |
|
|
|
from requests.utils import quote |
|
google_url = self.rss_sources['google_news'].format(quote(query)) |
|
google_articles = self._scrape_google_news(google_url, query, max_articles // 2 or 5) |
|
all_articles.extend(google_articles) |
|
|
|
|
|
for source_name, rss_url in list(self.rss_sources.items())[1:4]: |
|
if len(all_articles) >= max_articles: |
|
break |
|
try: |
|
source_articles = self._scrape_rss_source(rss_url, query, 5) |
|
all_articles.extend(source_articles) |
|
except Exception as e: |
|
logger.warning(f"Failed to scrape {source_name}: {str(e)}") |
|
continue |
|
|
|
|
|
articles = self._deduplicate_articles(all_articles) |
|
articles = self._filter_articles(articles, query) |
|
articles = articles[:max_articles] |
|
|
|
|
|
for article in articles: |
|
try: |
|
full_content = self._extract_full_content(article['url']) |
|
if full_content and len(full_content) > 200: |
|
article['content'] = full_content |
|
else: |
|
|
|
fallback = article.get('summary') or article.get('title', '') |
|
article['content'] = clean_html(fallback) if '<' in fallback else fallback |
|
except Exception as e: |
|
logger.warning(f"Failed to extract content from {article['url']}: {str(e)}") |
|
fallback = article.get('summary') or article.get('title', '') |
|
article['content'] = clean_html(fallback) if '<' in fallback else fallback |
|
|
|
|
|
articles = [a for a in articles if self._is_english(a.get('content', ''))] |
|
|
|
logger.info(f"Successfully scraped {len(articles)} articles") |
|
return articles |
|
|
|
except Exception as e: |
|
logger.error(f"Error in scrape_news: {str(e)}") |
|
return [] |
|
|
|
|
|
|
|
|
|
def _scrape_google_news(self, url: str, query: str, max_articles: int) -> List[Dict]: |
|
"""Scrape Google News RSS and resolve to publisher URLs.""" |
|
try: |
|
headers = self._get_random_headers() |
|
response = self.session.get(url, headers=headers) |
|
if response.status_code != 200: |
|
logger.warning(f"Google News RSS returned status {response.status_code}") |
|
return [] |
|
|
|
feed = feedparser.parse(response.content) |
|
articles: List[Dict] = [] |
|
q = query.lower() |
|
|
|
for entry in feed.entries[:max_articles * 2]: |
|
try: |
|
raw_title = entry.get('title', '') or '' |
|
raw_summary = entry.get('summary', '') or '' |
|
link = entry.get('link', '') or '' |
|
|
|
|
|
title = clean_html(raw_title) if '<' in raw_title else raw_title |
|
summary = clean_html(raw_summary) if '<' in raw_summary else raw_summary |
|
|
|
|
|
if not (q in title.lower() or q in summary.lower()): |
|
continue |
|
|
|
article = { |
|
'title': title.strip(), |
|
'url': link, |
|
'summary': summary.strip(), |
|
'date': self._parse_date(entry.get('published', '')), |
|
'source': 'Google News' |
|
} |
|
|
|
|
|
try: |
|
r = self.session.get(link, headers=headers, allow_redirects=True) |
|
final_url = r.url if r is not None and r.url else link |
|
article['url'] = final_url |
|
article['source'] = self._extract_source_name(final_url) |
|
except Exception as e: |
|
logger.debug(f"Redirect resolution failed for Google link: {e}") |
|
|
|
if article['url'] in self.scraped_urls: |
|
continue |
|
|
|
self.scraped_urls.add(article['url']) |
|
articles.append(article) |
|
|
|
except Exception as e: |
|
logger.warning(f"Error parsing Google News entry: {str(e)}") |
|
continue |
|
|
|
return articles |
|
|
|
except Exception as e: |
|
logger.error(f"Error scraping Google News: {str(e)}") |
|
return [] |
|
|
|
def _scrape_rss_source(self, rss_url: str, query: str, max_articles: int) -> List[Dict]: |
|
"""Scrape a generic RSS source and soft-filter by query.""" |
|
try: |
|
headers = self._get_random_headers() |
|
response = self.session.get(rss_url, headers=headers) |
|
if response.status_code != 200: |
|
return [] |
|
|
|
feed = feedparser.parse(response.content) |
|
articles: List[Dict] = [] |
|
q = query.lower() |
|
|
|
for entry in feed.entries[:max_articles * 3]: |
|
try: |
|
title = entry.get('title', '') or '' |
|
summary = entry.get('summary', '') or '' |
|
link = entry.get('link', '') or '' |
|
|
|
if not (q in title.lower() or q in summary.lower()): |
|
continue |
|
|
|
|
|
title = clean_html(title) if '<' in title else title |
|
summary = clean_html(summary) if '<' in summary else summary |
|
|
|
article = { |
|
'title': title.strip(), |
|
'url': link, |
|
'summary': summary.strip(), |
|
'date': self._parse_date(entry.get('published', '')), |
|
'source': self._extract_source_name(rss_url) |
|
} |
|
|
|
|
|
try: |
|
r = self.session.get(link, headers=headers, allow_redirects=True) |
|
final_url = r.url if r is not None and r.url else link |
|
article['url'] = final_url |
|
article['source'] = self._extract_source_name(final_url) |
|
except Exception: |
|
pass |
|
|
|
if article['url'] in self.scraped_urls: |
|
continue |
|
|
|
self.scraped_urls.add(article['url']) |
|
articles.append(article) |
|
|
|
if len(articles) >= max_articles: |
|
break |
|
|
|
except Exception as e: |
|
logger.warning(f"Error parsing RSS entry: {str(e)}") |
|
continue |
|
|
|
time.sleep(0.4) |
|
return articles |
|
|
|
except Exception as e: |
|
logger.error(f"Error scraping RSS {rss_url}: {str(e)}") |
|
return [] |
|
|
|
|
|
|
|
|
|
def _extract_full_content(self, url: str) -> Optional[str]: |
|
""" |
|
Extract full article content; resolve redirects; parse with trafilatura. |
|
""" |
|
try: |
|
headers = self._get_random_headers() |
|
|
|
|
|
parsed = urlparse(url) |
|
if parsed.netloc.endswith("news.google.com"): |
|
try: |
|
resp = self.session.get(url, headers=headers, allow_redirects=True) |
|
if resp is not None and resp.url and resp.status_code in (200, 301, 302): |
|
url = resp.url |
|
except Exception as e: |
|
logger.warning(f"Failed to resolve Google News redirect: {e}") |
|
|
|
|
|
downloaded_html = None |
|
try: |
|
r = self.session.get(url, headers=headers, allow_redirects=True) |
|
if r is not None and r.status_code == 200: |
|
downloaded_html = r.text |
|
except Exception as e: |
|
logger.debug(f"requests fetch failed, will try trafilatura.fetch_url: {e}") |
|
|
|
|
|
if not downloaded_html: |
|
downloaded_html = trafilatura.fetch_url(url) |
|
|
|
if not downloaded_html: |
|
return None |
|
|
|
|
|
text = trafilatura.extract( |
|
downloaded_html, |
|
include_comments=False, |
|
include_tables=False, |
|
include_formatting=False, |
|
no_fallback=False, |
|
) |
|
if text and len(text.strip()) > 100: |
|
return text.strip() |
|
return None |
|
|
|
except Exception as e: |
|
logger.warning(f"Error extracting content from {url}: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
|
|
def _deduplicate_articles(self, articles: List[Dict]) -> List[Dict]: |
|
"""Remove duplicate articles based on title+summary similarity.""" |
|
unique_articles: List[Dict] = [] |
|
|
|
for article in articles: |
|
content_for_hash = f"{article.get('title','')} {article.get('summary','')}" |
|
content_hash = hashlib.md5(content_for_hash.encode()).hexdigest() |
|
|
|
if content_hash not in self.content_hashes: |
|
self.content_hashes.add(content_hash) |
|
unique_articles.append(article) |
|
|
|
logger.info(f"Deduplicated {len(articles)} -> {len(unique_articles)} articles") |
|
return unique_articles |
|
|
|
def _filter_articles(self, articles: List[Dict], query: str) -> List[Dict]: |
|
"""Filter articles for relevance and quality.""" |
|
filtered: List[Dict] = [] |
|
q = query.lower() |
|
|
|
for a in articles: |
|
title_summary = f"{a.get('title','')} {a.get('summary','')}".strip() |
|
if len(title_summary) < 50: |
|
continue |
|
|
|
title_l = a.get('title', '').lower() |
|
summary_l = a.get('summary', '').lower() |
|
|
|
if (q in title_l) or (q in summary_l) or any(word in title_l for word in q.split()): |
|
filtered.append(a) |
|
|
|
logger.info(f"Filtered {len(articles)} -> {len(filtered)} articles for relevance") |
|
return filtered |
|
|
|
def _is_english(self, text: str) -> bool: |
|
"""Check if text is in English using language detection.""" |
|
try: |
|
if len(text.strip()) < 20: |
|
return True |
|
detected_lang = detect(text[:1000]) |
|
return detected_lang == 'en' |
|
except Exception: |
|
|
|
return True |
|
|
|
def _parse_date(self, date_str: str) -> Optional[datetime]: |
|
"""Parse date from RSS feed.""" |
|
if not date_str: |
|
return datetime.now() |
|
try: |
|
for fmt in [ |
|
'%a, %d %b %Y %H:%M:%S %Z', |
|
'%a, %d %b %Y %H:%M:%S %z', |
|
'%Y-%m-%dT%H:%M:%SZ', |
|
'%Y-%m-%d %H:%M:%S', |
|
]: |
|
try: |
|
return datetime.strptime(date_str.strip(), fmt) |
|
except ValueError: |
|
continue |
|
return datetime.now() |
|
except Exception: |
|
return datetime.now() |
|
|
|
def _extract_source_name(self, url: str) -> str: |
|
"""Extract source name from URL.""" |
|
try: |
|
domain = urlparse(url).netloc |
|
domain = domain.replace('www.', '').replace('feeds.', '') |
|
|
|
domain_mapping = { |
|
'news.google.com': 'Google News', |
|
'finance.yahoo.com': 'Yahoo Finance', |
|
'reuters.com': 'Reuters', |
|
'reutersagency.com': 'Reuters', |
|
'bbc.co.uk': 'BBC', |
|
'bbc.com': 'BBC', |
|
'cnbc.com': 'CNBC', |
|
'marketwatch.com': 'MarketWatch', |
|
'ft.com': 'Financial Times', |
|
'bloomberg.com': 'Bloomberg' |
|
} |
|
return domain_mapping.get(domain, domain.title()) |
|
except Exception: |
|
return 'Unknown' |
|
|
|
def get_available_sources(self) -> List[str]: |
|
"""Get list of available news sources.""" |
|
return list(self.rss_sources.keys()) |
|
|
|
|
|
|
|
|
|
|
|
def clean_html(html_content: str) -> str: |
|
"""Clean HTML content and extract readable text.""" |
|
try: |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
script.extract() |
|
|
|
text = soup.get_text(separator=" ") |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = ' '.join(chunk for chunk in chunks if chunk) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
return text.strip() |
|
|
|
except Exception as e: |
|
logger.error(f"Error cleaning HTML: {str(e)}") |
|
return "" |
|
|
|
def is_valid_article_url(url: str) -> bool: |
|
"""Check if URL is likely to be a valid article URL.""" |
|
try: |
|
parsed = urlparse(url) |
|
|
|
|
|
skip_extensions = ['.pdf', '.jpg', '.png', '.gif', '.mp4', '.mp3', '.webp'] |
|
if any(url.lower().endswith(ext) for ext in skip_extensions): |
|
return False |
|
|
|
|
|
skip_patterns = ['login', 'register', 'subscribe', 'newsletter', 'sitemap'] |
|
if any(pattern in url.lower() for pattern in skip_patterns): |
|
return False |
|
|
|
return True |
|
|
|
except Exception: |
|
return False |
|
|