|
import requests |
|
from bs4 import BeautifulSoup |
|
import feedparser |
|
import trafilatura |
|
from urllib.parse import urljoin, urlparse |
|
import time |
|
import logging |
|
from datetime import datetime, timedelta |
|
from typing import List, Dict, Optional, Set |
|
import hashlib |
|
import re |
|
from langdetect import detect |
|
import random |
|
from requests.adapters import HTTPAdapter |
|
from urllib3.util.retry import Retry |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class NewsletterScraper: |
|
"""Robust news scraper with multiple sources and deduplication""" |
|
|
|
def __init__(self): |
|
self.session = self._create_session() |
|
self.scraped_urls: Set[str] = set() |
|
self.content_hashes: Set[str] = set() |
|
|
|
|
|
self.rss_sources = { |
|
'google_news': 'https://news.google.com/rss/search?q={}&hl=en&gl=US&ceid=US:en', |
|
'yahoo_finance': 'https://feeds.finance.yahoo.com/rss/2.0/headline', |
|
'reuters_business': 'https://www.reutersagency.com/feed/?best-topics=business-finance&post_type=best', |
|
'bbc_business': 'http://feeds.bbci.co.uk/news/business/rss.xml', |
|
'cnbc': 'https://www.cnbc.com/id/100003114/device/rss/rss.html', |
|
'marketwatch': 'http://feeds.marketwatch.com/marketwatch/topstories/', |
|
'financial_times': 'https://www.ft.com/rss/home', |
|
'bloomberg': 'https://feeds.bloomberg.com/politics/news.rss' |
|
} |
|
|
|
self.user_agents = [ |
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0', |
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0' |
|
] |
|
|
|
logger.info("NewsletterScraper initialized") |
|
|
|
def _create_session(self) -> requests.Session: |
|
"""Create a session with retry strategy""" |
|
session = requests.Session() |
|
|
|
|
|
retry_strategy = Retry( |
|
total=3, |
|
backoff_factor=1, |
|
status_forcelist=[429, 500, 502, 503, 504], |
|
) |
|
|
|
adapter = HTTPAdapter(max_retries=retry_strategy) |
|
session.mount("http://", adapter) |
|
session.mount("https://", adapter) |
|
|
|
return session |
|
|
|
def _get_random_headers(self) -> Dict[str, str]: |
|
"""Get randomized headers to avoid blocking""" |
|
return { |
|
'User-Agent': random.choice(self.user_agents), |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'Connection': 'keep-alive', |
|
'Upgrade-Insecure-Requests': '1', |
|
} |
|
|
|
def scrape_news(self, query: str, max_articles: int = 20) -> List[Dict]: |
|
"""Main scraping function""" |
|
logger.info(f"Starting news scraping for query: {query}") |
|
|
|
all_articles = [] |
|
self.scraped_urls.clear() |
|
self.content_hashes.clear() |
|
|
|
try: |
|
|
|
google_articles = self._scrape_google_news(query, max_articles // 2) |
|
all_articles.extend(google_articles) |
|
|
|
|
|
for source_name, rss_url in list(self.rss_sources.items())[1:4]: |
|
if len(all_articles) >= max_articles: |
|
break |
|
|
|
try: |
|
source_articles = self._scrape_rss_source(rss_url, query, 5) |
|
all_articles.extend(source_articles) |
|
except Exception as e: |
|
logger.warning(f"Failed to scrape {source_name}: {str(e)}") |
|
continue |
|
|
|
|
|
articles = self._deduplicate_articles(all_articles) |
|
articles = self._filter_articles(articles, query) |
|
articles = articles[:max_articles] |
|
|
|
|
|
for article in articles: |
|
try: |
|
full_content = self._extract_full_content(article['url']) |
|
if full_content and len(full_content) > 200: |
|
article['content'] = full_content |
|
else: |
|
article['content'] = article.get('summary', article.get('title', '')) |
|
except Exception as e: |
|
logger.warning(f"Failed to extract content from {article['url']}: {str(e)}") |
|
article['content'] = article.get('summary', article.get('title', '')) |
|
|
|
|
|
articles = [article for article in articles if self._is_english(article['content'])] |
|
|
|
logger.info(f"Successfully scraped {len(articles)} articles") |
|
return articles |
|
|
|
except Exception as e: |
|
logger.error(f"Error in scrape_news: {str(e)}") |
|
return [] |
|
|
|
def _scrape_google_news(self, query: str, max_articles: int) -> List[Dict]: |
|
"""Scrape Google News RSS""" |
|
try: |
|
url = self.rss_sources['google_news'].format(query.replace(' ', '%20')) |
|
|
|
headers = self._get_random_headers() |
|
response = self.session.get(url, headers=headers, timeout=10) |
|
|
|
if response.status_code != 200: |
|
logger.warning(f"Google News RSS returned status {response.status_code}") |
|
return [] |
|
|
|
feed = feedparser.parse(response.content) |
|
articles = [] |
|
|
|
for entry in feed.entries[:max_articles * 2]: |
|
try: |
|
article = { |
|
'title': entry.title, |
|
'url': entry.link, |
|
'summary': entry.get('summary', ''), |
|
'date': self._parse_date(entry.get('published', '')), |
|
'source': 'Google News' |
|
} |
|
|
|
|
|
if article['url'] in self.scraped_urls: |
|
continue |
|
|
|
self.scraped_urls.add(article['url']) |
|
articles.append(article) |
|
|
|
except Exception as e: |
|
logger.warning(f"Error parsing Google News entry: {str(e)}") |
|
continue |
|
|
|
return articles |
|
|
|
except Exception as e: |
|
logger.error(f"Error scraping Google News: {str(e)}") |
|
return [] |
|
|
|
def _scrape_rss_source(self, rss_url: str, query: str, max_articles: int) -> List[Dict]: |
|
"""Scrape a generic RSS source""" |
|
try: |
|
headers = self._get_random_headers() |
|
response = self.session.get(rss_url, headers=headers, timeout=10) |
|
|
|
if response.status_code != 200: |
|
return [] |
|
|
|
feed = feedparser.parse(response.content) |
|
articles = [] |
|
query_lower = query.lower() |
|
|
|
for entry in feed.entries[:max_articles * 3]: |
|
try: |
|
title = entry.get('title', '') |
|
summary = entry.get('summary', '') |
|
|
|
|
|
if not (query_lower in title.lower() or query_lower in summary.lower()): |
|
continue |
|
|
|
article = { |
|
'title': title, |
|
'url': entry.get('link', ''), |
|
'summary': summary, |
|
'date': self._parse_date(entry.get('published', '')), |
|
'source': self._extract_source_name(rss_url) |
|
} |
|
|
|
|
|
if article['url'] in self.scraped_urls: |
|
continue |
|
|
|
self.scraped_urls.add(article['url']) |
|
articles.append(article) |
|
|
|
if len(articles) >= max_articles: |
|
break |
|
|
|
except Exception as e: |
|
logger.warning(f"Error parsing RSS entry: {str(e)}") |
|
continue |
|
|
|
|
|
time.sleep(0.5) |
|
|
|
return articles |
|
|
|
except Exception as e: |
|
logger.error(f"Error scraping RSS {rss_url}: {str(e)}") |
|
return [] |
|
|
|
def _extract_full_content(self, url: str) -> Optional[str]: |
|
"""Extract full article content using trafilatura""" |
|
try: |
|
headers = self._get_random_headers() |
|
|
|
|
|
downloaded = trafilatura.fetch_url(url, headers=headers) |
|
|
|
if not downloaded: |
|
return None |
|
|
|
|
|
text = trafilatura.extract( |
|
downloaded, |
|
include_comments=False, |
|
include_tables=False, |
|
include_formatting=False, |
|
no_fallback=False |
|
) |
|
|
|
if text and len(text.strip()) > 100: |
|
return text.strip() |
|
|
|
return None |
|
|
|
except Exception as e: |
|
logger.warning(f"Error extracting content from {url}: {str(e)}") |
|
return None |
|
|
|
def _deduplicate_articles(self, articles: List[Dict]) -> List[Dict]: |
|
"""Remove duplicate articles based on content similarity""" |
|
unique_articles = [] |
|
|
|
for article in articles: |
|
|
|
content_for_hash = f"{article['title']} {article.get('summary', '')}" |
|
content_hash = hashlib.md5(content_for_hash.encode()).hexdigest() |
|
|
|
if content_hash not in self.content_hashes: |
|
self.content_hashes.add(content_hash) |
|
unique_articles.append(article) |
|
|
|
logger.info(f"Deduplicated {len(articles)} -> {len(unique_articles)} articles") |
|
return unique_articles |
|
|
|
def _filter_articles(self, articles: List[Dict], query: str) -> List[Dict]: |
|
"""Filter articles for relevance and quality""" |
|
filtered_articles = [] |
|
query_lower = query.lower() |
|
|
|
for article in articles: |
|
|
|
title_summary = f"{article['title']} {article.get('summary', '')}" |
|
if len(title_summary.strip()) < 50: |
|
continue |
|
|
|
|
|
if (query_lower in article['title'].lower() or |
|
query_lower in article.get('summary', '').lower() or |
|
any(word in article['title'].lower() for word in query_lower.split())): |
|
|
|
filtered_articles.append(article) |
|
|
|
logger.info(f"Filtered {len(articles)} -> {len(filtered_articles)} articles for relevance") |
|
return filtered_articles |
|
|
|
def _is_english(self, text: str) -> bool: |
|
"""Check if text is in English using language detection""" |
|
try: |
|
if len(text.strip()) < 20: |
|
return True |
|
|
|
detected_lang = detect(text[:1000]) |
|
return detected_lang == 'en' |
|
|
|
except Exception: |
|
|
|
return True |
|
|
|
def _parse_date(self, date_str: str) -> Optional[datetime]: |
|
"""Parse date from RSS feed""" |
|
if not date_str: |
|
return datetime.now() |
|
|
|
try: |
|
|
|
for fmt in ['%a, %d %b %Y %H:%M:%S %Z', |
|
'%Y-%m-%dT%H:%M:%SZ', |
|
'%Y-%m-%d %H:%M:%S']: |
|
try: |
|
return datetime.strptime(date_str.strip(), fmt) |
|
except ValueError: |
|
continue |
|
|
|
|
|
return datetime.now() |
|
|
|
except Exception: |
|
return datetime.now() |
|
|
|
def _extract_source_name(self, url: str) -> str: |
|
"""Extract source name from URL""" |
|
try: |
|
domain = urlparse(url).netloc |
|
|
|
|
|
domain = domain.replace('www.', '').replace('feeds.', '') |
|
|
|
|
|
domain_mapping = { |
|
'news.google.com': 'Google News', |
|
'finance.yahoo.com': 'Yahoo Finance', |
|
'reuters.com': 'Reuters', |
|
'reutersagency.com': 'Reuters', |
|
'bbc.co.uk': 'BBC', |
|
'cnbc.com': 'CNBC', |
|
'marketwatch.com': 'MarketWatch', |
|
'ft.com': 'Financial Times', |
|
'bloomberg.com': 'Bloomberg' |
|
} |
|
|
|
return domain_mapping.get(domain, domain.title()) |
|
|
|
except Exception: |
|
return 'Unknown' |
|
|
|
def get_available_sources(self) -> List[str]: |
|
"""Get list of available news sources""" |
|
return list(self.rss_sources.keys()) |
|
|
|
|
|
def clean_html(html_content: str) -> str: |
|
"""Clean HTML content and extract text""" |
|
try: |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
script.extract() |
|
|
|
|
|
text = soup.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = ' '.join(chunk for chunk in chunks if chunk) |
|
|
|
return text |
|
|
|
except Exception as e: |
|
logger.error(f"Error cleaning HTML: {str(e)}") |
|
return "" |
|
|
|
def is_valid_article_url(url: str) -> bool: |
|
"""Check if URL is likely to be a valid article URL""" |
|
try: |
|
parsed = urlparse(url) |
|
|
|
|
|
skip_extensions = ['.pdf', '.jpg', '.png', '.gif', '.mp4', '.mp3'] |
|
if any(url.lower().endswith(ext) for ext in skip_extensions): |
|
return False |
|
|
|
|
|
skip_patterns = ['login', 'register', 'subscribe', 'newsletter', 'sitemap'] |
|
if any(pattern in url.lower() for pattern in skip_patterns): |
|
return False |
|
|
|
return True |
|
|
|
except Exception: |
|
return False |