Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
import feedparser | |
import trafilatura | |
from urllib.parse import urljoin, urlparse | |
import time | |
import logging | |
from datetime import datetime, timedelta | |
from typing import List, Dict, Optional, Set | |
import hashlib | |
import re | |
from langdetect import detect | |
import random | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
logger = logging.getLogger(__name__) | |
class NewsletterScraper: | |
"""Robust news scraper with multiple sources, redirect resolution, and deduplication""" | |
def __init__(self): | |
self.session = self._create_session() | |
self.scraped_urls: Set[str] = set() | |
self.content_hashes: Set[str] = set() | |
# News sources configuration | |
self.rss_sources = { | |
'google_news': 'https://news.google.com/rss/search?q={}&hl=en&gl=US&ceid=US:en', | |
'yahoo_finance': 'https://feeds.finance.yahoo.com/rss/2.0/headline', | |
'reuters_business': 'https://www.reutersagency.com/feed/?best-topics=business-finance&post_type=best', | |
'bbc_business': 'http://feeds.bbci.co.uk/news/business/rss.xml', | |
'cnbc': 'https://www.cnbc.com/id/100003114/device/rss/rss.html', | |
'marketwatch': 'http://feeds.marketwatch.com/marketwatch/topstories/', | |
'financial_times': 'https://www.ft.com/rss/home', | |
'bloomberg': 'https://feeds.bloomberg.com/politics/news.rss' | |
} | |
self.user_agents = [ | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15', | |
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36', | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 12.6; rv:121.0) Gecko/20100101 Firefox/121.0' | |
] | |
logger.info("NewsletterScraper initialized") | |
def _create_session(self) -> requests.Session: | |
"""Create a session with retry strategy""" | |
session = requests.Session() | |
retry_strategy = Retry( | |
total=3, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
allowed_methods=["GET", "HEAD"], | |
raise_on_status=False, | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=20, pool_maxsize=20) | |
session.mount("http://", adapter) | |
session.mount("https://", adapter) | |
# Default timeouts for all requests via session | |
session.request = self._with_timeout(session.request, timeout=10) | |
return session | |
def _with_timeout(func, timeout: int = 10): | |
"""Wrap session.request to always include a timeout unless explicitly provided.""" | |
def wrapper(method, url, **kwargs): | |
if "timeout" not in kwargs: | |
kwargs["timeout"] = timeout | |
return func(method, url, **kwargs) | |
return wrapper | |
def _get_random_headers(self) -> Dict[str, str]: | |
"""Get randomized headers to avoid blocking""" | |
return { | |
'User-Agent': random.choice(self.user_agents), | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
} | |
# ------------------------------------------------------------------------- | |
# Public entrypoint | |
# ------------------------------------------------------------------------- | |
def scrape_news(self, query: str, max_articles: int = 20) -> List[Dict]: | |
"""Main scraping function""" | |
logger.info(f"Starting news scraping for query: {query}") | |
all_articles: List[Dict] = [] | |
self.scraped_urls.clear() | |
self.content_hashes.clear() | |
try: | |
# Primary: Google News RSS | |
google_articles = self._scrape_google_news(query, max_articles // 2 or 5) | |
all_articles.extend(google_articles) | |
# Secondary: Other RSS sources (limit a few to reduce timeouts on free CPU) | |
for source_name, rss_url in list(self.rss_sources.items())[1:4]: | |
if len(all_articles) >= max_articles: | |
break | |
try: | |
source_articles = self._scrape_rss_source(rss_url, query, 5) | |
all_articles.extend(source_articles) | |
except Exception as e: | |
logger.warning(f"Failed to scrape {source_name}: {str(e)}") | |
continue | |
# Deduplicate and filter | |
articles = self._deduplicate_articles(all_articles) | |
articles = self._filter_articles(articles, query) | |
articles = articles[:max_articles] | |
# Extract full content (after resolving redirects) | |
for article in articles: | |
try: | |
full_content = self._extract_full_content(article['url']) | |
if full_content and len(full_content) > 200: | |
article['content'] = full_content | |
else: | |
# Fallback to cleaned summary/title | |
fallback = article.get('summary') or article.get('title', '') | |
article['content'] = clean_html(fallback) if '<' in fallback else fallback | |
except Exception as e: | |
logger.warning(f"Failed to extract content from {article['url']}: {str(e)}") | |
fallback = article.get('summary') or article.get('title', '') | |
article['content'] = clean_html(fallback) if '<' in fallback else fallback | |
# Filter by language (English only) | |
articles = [a for a in articles if self._is_english(a.get('content', ''))] | |
logger.info(f"Successfully scraped {len(articles)} articles") | |
return articles | |
except Exception as e: | |
logger.error(f"Error in scrape_news: {str(e)}") | |
return [] | |
# ------------------------------------------------------------------------- | |
# Source-specific + generic RSS | |
# ------------------------------------------------------------------------- | |
def _scrape_google_news(self, query: str, max_articles: int) -> List[Dict]: | |
"""Scrape Google News RSS and resolve to publisher URLs""" | |
try: | |
url = self.rss_sources['google_news'].format(requests.utils.quote(query)) | |
headers = self._get_random_headers() | |
response = self.session.get(url, headers=headers) | |
if response.status_code != 200: | |
logger.warning(f"Google News RSS returned status {response.status_code}") | |
return [] | |
feed = feedparser.parse(response.content) | |
articles: List[Dict] = [] | |
for entry in feed.entries[:max_articles * 2]: # extra for filtering | |
try: | |
raw_title = entry.get('title', '') or '' | |
raw_summary = entry.get('summary', '') or '' | |
link = entry.get('link', '') or '' | |
# Strip HTML if present in title/summary | |
title = clean_html(raw_title) if '<' in raw_title else raw_title | |
summary = clean_html(raw_summary) if '<' in raw_summary else raw_summary | |
article = { | |
'title': title.strip(), | |
'url': link, | |
'summary': summary.strip(), | |
'date': self._parse_date(entry.get('published', '')), | |
'source': 'Google News' | |
} | |
# Try to resolve redirect to publisher and set clean source | |
try: | |
r = self.session.get(link, headers=headers, allow_redirects=True) | |
final_url = r.url if r is not None and r.url else link | |
article['url'] = final_url | |
article['source'] = self._extract_source_name(final_url) | |
except Exception as e: | |
logger.debug(f"Redirect resolution failed for Google link: {e}") | |
if article['url'] in self.scraped_urls: | |
continue | |
self.scraped_urls.add(article['url']) | |
articles.append(article) | |
except Exception as e: | |
logger.warning(f"Error parsing Google News entry: {str(e)}") | |
continue | |
return articles | |
except Exception as e: | |
logger.error(f"Error scraping Google News: {str(e)}") | |
return [] | |
def _scrape_rss_source(self, rss_url: str, query: str, max_articles: int) -> List[Dict]: | |
"""Scrape a generic RSS source and soft-filter by query""" | |
try: | |
headers = self._get_random_headers() | |
response = self.session.get(rss_url, headers=headers) | |
if response.status_code != 200: | |
return [] | |
feed = feedparser.parse(response.content) | |
articles: List[Dict] = [] | |
q = query.lower() | |
for entry in feed.entries[:max_articles * 3]: | |
try: | |
title = entry.get('title', '') or '' | |
summary = entry.get('summary', '') or '' | |
link = entry.get('link', '') or '' | |
if not (q in title.lower() or q in summary.lower()): | |
continue | |
# Clean any HTML artifacts | |
title = clean_html(title) if '<' in title else title | |
summary = clean_html(summary) if '<' in summary else summary | |
article = { | |
'title': title.strip(), | |
'url': link, | |
'summary': summary.strip(), | |
'date': self._parse_date(entry.get('published', '')), | |
'source': self._extract_source_name(rss_url) | |
} | |
# Resolve potential redirects to get publisher domain | |
try: | |
r = self.session.get(link, headers=headers, allow_redirects=True) | |
final_url = r.url if r is not None and r.url else link | |
article['url'] = final_url | |
article['source'] = self._extract_source_name(final_url) | |
except Exception: | |
pass | |
if article['url'] in self.scraped_urls: | |
continue | |
self.scraped_urls.add(article['url']) | |
articles.append(article) | |
if len(articles) >= max_articles: | |
break | |
except Exception as e: | |
logger.warning(f"Error parsing RSS entry: {str(e)}") | |
continue | |
time.sleep(0.4) # be polite | |
return articles | |
except Exception as e: | |
logger.error(f"Error scraping RSS {rss_url}: {str(e)}") | |
return [] | |
# ------------------------------------------------------------------------- | |
# Extraction + cleaning | |
# ------------------------------------------------------------------------- | |
def _extract_full_content(self, url: str) -> Optional[str]: | |
"""Extract full article content; resolve redirects; parse with trafilatura.""" | |
try: | |
headers = self._get_random_headers() | |
# If it's a Google News link, follow redirects to the publisher URL | |
parsed = urlparse(url) | |
if parsed.netloc.endswith("news.google.com"): | |
try: | |
resp = self.session.get(url, headers=headers, allow_redirects=True) | |
if resp is not None and resp.url and resp.status_code in (200, 301, 302): | |
url = resp.url | |
except Exception as e: | |
logger.warning(f"Failed to resolve Google News redirect: {e}") | |
# 1st try: fetch HTML with our session (so we control headers) | |
downloaded_html = None | |
try: | |
r = self.session.get(url, headers=headers, allow_redirects=True) | |
if r is not None and r.status_code == 200: | |
downloaded_html = r.text | |
except Exception as e: | |
logger.debug(f"requests fetch failed, will try trafilatura.fetch_url: {e}") | |
# Fallback: let trafilatura fetch the URL itself (no headers param) | |
if not downloaded_html: | |
downloaded_html = trafilatura.fetch_url(url) | |
if not downloaded_html: | |
return None | |
# Extract readable text | |
text = trafilatura.extract( | |
downloaded_html, | |
include_comments=False, | |
include_tables=False, | |
include_formatting=False, | |
no_fallback=False, | |
) | |
if text and len(text.strip()) > 100: | |
return text.strip() | |
return None | |
except Exception as e: | |
logger.warning(f"Error extracting content from {url}: {str(e)}") | |
return None | |
# ------------------------------------------------------------------------- | |
# Post-processing helpers | |
# ------------------------------------------------------------------------- | |
def _deduplicate_articles(self, articles: List[Dict]) -> List[Dict]: | |
"""Remove duplicate articles based on title+summary similarity""" | |
unique_articles: List[Dict] = [] | |
for article in articles: | |
content_for_hash = f"{article.get('title','')} {article.get('summary','')}" | |
content_hash = hashlib.md5(content_for_hash.encode()).hexdigest() | |
if content_hash not in self.content_hashes: | |
self.content_hashes.add(content_hash) | |
unique_articles.append(article) | |
logger.info(f"Deduplicated {len(articles)} -> {len(unique_articles)} articles") | |
return unique_articles | |
def _filter_articles(self, articles: List[Dict], query: str) -> List[Dict]: | |
"""Filter articles for relevance and quality""" | |
filtered: List[Dict] = [] | |
q = query.lower() | |
for a in articles: | |
title_summary = f"{a.get('title','')} {a.get('summary','')}".strip() | |
if len(title_summary) < 50: | |
continue | |
title_l = a.get('title', '').lower() | |
summary_l = a.get('summary', '').lower() | |
if (q in title_l) or (q in summary_l) or any(word in title_l for word in q.split()): | |
filtered.append(a) | |
logger.info(f"Filtered {len(articles)} -> {len(filtered)} articles for relevance") | |
return filtered | |
def _is_english(self, text: str) -> bool: | |
"""Check if text is in English using language detection""" | |
try: | |
if len(text.strip()) < 20: | |
return True # too short to decide; keep it | |
detected_lang = detect(text[:1000]) | |
return detected_lang == 'en' | |
except Exception: | |
# If detection fails, assume English | |
return True | |
def _parse_date(self, date_str: str) -> Optional[datetime]: | |
"""Parse date from RSS feed""" | |
if not date_str: | |
return datetime.now() | |
try: | |
for fmt in [ | |
'%a, %d %b %Y %H:%M:%S %Z', | |
'%a, %d %b %Y %H:%M:%S %z', | |
'%Y-%m-%dT%H:%M:%SZ', | |
'%Y-%m-%d %H:%M:%S', | |
]: | |
try: | |
return datetime.strptime(date_str.strip(), fmt) | |
except ValueError: | |
continue | |
return datetime.now() | |
except Exception: | |
return datetime.now() | |
def _extract_source_name(self, url: str) -> str: | |
"""Extract source name from URL""" | |
try: | |
domain = urlparse(url).netloc | |
domain = domain.replace('www.', '').replace('feeds.', '') | |
domain_mapping = { | |
'news.google.com': 'Google News', | |
'finance.yahoo.com': 'Yahoo Finance', | |
'reuters.com': 'Reuters', | |
'reutersagency.com': 'Reuters', | |
'bbc.co.uk': 'BBC', | |
'bbc.com': 'BBC', | |
'cnbc.com': 'CNBC', | |
'marketwatch.com': 'MarketWatch', | |
'ft.com': 'Financial Times', | |
'bloomberg.com': 'Bloomberg' | |
} | |
return domain_mapping.get(domain, domain.title()) | |
except Exception: | |
return 'Unknown' | |
def get_available_sources(self) -> List[str]: | |
"""Get list of available news sources""" | |
return list(self.rss_sources.keys()) | |
# ------------------------------------------------------------------------- | |
# Module-level helpers | |
# ------------------------------------------------------------------------- | |
def clean_html(html_content: str) -> str: | |
"""Clean HTML content and extract readable text""" | |
try: | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.extract() | |
text = soup.get_text(separator=" ") | |
# Clean up whitespace | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
# Remove leftover html entities / excessive spaces | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
except Exception as e: | |
logger.error(f"Error cleaning HTML: {str(e)}") | |
return "" | |
def is_valid_article_url(url: str) -> bool: | |
"""Check if URL is likely to be a valid article URL""" | |
try: | |
parsed = urlparse(url) | |
# Skip certain file types | |
skip_extensions = ['.pdf', '.jpg', '.png', '.gif', '.mp4', '.mp3', '.webp'] | |
if any(url.lower().endswith(ext) for ext in skip_extensions): | |
return False | |
# Skip obvious non-article URLs | |
skip_patterns = ['login', 'register', 'subscribe', 'newsletter', 'sitemap'] | |
if any(pattern in url.lower() for pattern in skip_patterns): | |
return False | |
return True | |
except Exception: | |
return False | |