Link2Doc / WebScraper.py
n0v33n
Create required file for this space
ff3a25c
raw
history blame
16.3 kB
import time
import os
import re
import urllib.parse
from datetime import datetime
# from selenium import webdriver
# from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
SELENIUM_AVAILABLE = True
except ImportError:
SELENIUM_AVAILABLE = False
print("Selenium not available. Some features may not work.")
class WebsiteScraper:
def __init__(self, base_url, site_name, site_description="", site_category="General",
output_dir=None, max_depth=3, max_pages=50, delay=2, headless=True,
scrape_external_links=False, content_selectors=None):
"""
Initialize the website scraper.
Args:
base_url (str): Starting URL to scrape
site_name (str): Name of the website
site_description (str): Description of the website
site_category (str): Category of the website
output_dir (str): Directory to save files (auto-generated if None)
max_depth (int): Maximum depth to crawl
max_pages (int): Maximum number of pages to scrape
delay (float): Delay between requests in seconds
headless (bool): Run browser in headless mode
scrape_external_links (bool): Whether to follow external links
content_selectors (list): CSS selectors to find main content
"""
parsed_url = urllib.parse.urlparse(base_url)
self.base_url = base_url
self.base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
self.domain_name = parsed_url.netloc
self.site_name = site_name
self.site_description = site_description
self.site_category = site_category
self.scrape_external_links = scrape_external_links
self.content_selectors = content_selectors or [
'main', 'article', '.content', '#content', '.main-content',
'.post-content', '.entry-content', '.page-content', 'body'
]
self.max_depth = max_depth
self.max_pages = max_pages
self.delay = delay
self.visited_links = set()
self.page_count = 0
self.start_time = datetime.now()
if output_dir is None:
domain_safe = self.domain_name.replace(".", "_").replace(":", "_")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.output_dir = f"{site_name}_{domain_safe}_{timestamp}"
else:
self.output_dir = output_dir
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
self.log_path = os.path.join(self.output_dir, "scraping_log.txt")
with open(self.log_path, "w", encoding="utf-8") as log_file:
log_file.write(f"Website scraping started at: {self.start_time}\n")
log_file.write(f"Website: {self.site_name}\n")
log_file.write(f"Description: {self.site_description}\n")
log_file.write(f"Category: {self.site_category}\n")
log_file.write(f"Base URL: {self.base_url}\n")
log_file.write(f"Domain: {self.domain_name}\n")
log_file.write(f"Max depth: {self.max_depth}\n")
log_file.write(f"Max pages: {self.max_pages}\n")
log_file.write(f"External links: {self.scrape_external_links}\n\n")
self.setup_driver(headless)
self.documents = []
def setup_driver(self, headless):
"""Setup Chrome driver with options."""
try:
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-logging")
chrome_options.add_argument("--log-level=3")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--disable-web-security")
chrome_options.add_argument("--allow-running-insecure-content")
chrome_options.add_argument("--disable-features=VizDisplayCompositor")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
chrome_options.binary_location = "/usr/bin/chromium"
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
chrome_options.add_experimental_option('useAutomationExtension', False)
try:
self.driver = webdriver.Chrome(
executable_path="/usr/bin/chromedriver",
options=chrome_options
)
except:
from webdriver_manager.chrome import ChromeDriverManager
self.driver = webdriver.Chrome(
ChromeDriverManager().install(),
options=chrome_options
)
self.log_message("Chrome driver initialized successfully")
except Exception as e:
self.log_message(f"Error setting up Chrome driver: {e}")
raise
def log_message(self, message):
"""Write message to console and log file."""
print(message)
with open(self.log_path, "a", encoding="utf-8") as log_file:
log_file.write(f"{message}\n")
def is_valid_url(self, url):
"""Check if URL should be scraped."""
if not self.scrape_external_links and not url.startswith(self.base_domain):
return False
if re.search(r"\.(pdf|doc|docx|xls|xlsx|ppt|pptx|jpg|jpeg|png|gif|svg|ico|css|js|xml|json|zip|tar|gz|rar|7z|exe|dmg|mp3|mp4|avi|mov|wmv)$", url, re.IGNORECASE):
return False
if "#" in url:
url = url.split("#")[0]
if url in self.visited_links:
return False
skip_patterns = [
'/login', '/register', '/signup', '/sign-up', '/signin', '/sign-in',
'/logout', '/password', '/forgot', '/reset',
'/admin', '/dashboard', '/account', '/profile',
'/cart', '/checkout', '/payment', '/billing',
'/terms', '/privacy', '/legal', '/disclaimer',
'/sitemap', '/robots.txt', '/favicon'
]
url_lower = url.lower()
for pattern in skip_patterns:
if pattern in url_lower:
return False
spam_patterns = ['popup', 'advertisement', 'tracking', 'analytics']
for pattern in spam_patterns:
if pattern in url_lower:
return False
return True
def sanitize_filename(self, text):
"""Convert text to safe filename."""
if not text or len(text.strip()) == 0:
return f"page_{self.page_count}"
safe_name = re.sub(r'[^\w\s()-]', "_", text)
safe_name = re.sub(r'\s+', "_", safe_name)
safe_name = safe_name.strip("_")
return safe_name[:100] if len(safe_name) > 100 else safe_name
def extract_links(self):
"""Extract valid links from current page."""
links = self.driver.find_elements(By.TAG_NAME, "a")
valid_links = []
for link in links:
try:
href = link.get_attribute("href")
if href:
if href.startswith('/'):
href = self.base_domain + href
elif href.startswith('./') or not href.startswith('http'):
current_url = self.driver.current_url
base_path = '/'.join(current_url.split('/')[:-1])
href = base_path + '/' + href.lstrip('./')
if self.is_valid_url(href) and href not in self.visited_links:
valid_links.append(href)
except Exception:
continue
return list(set(valid_links))
def extract_main_content(self, soup):
"""Extract main content using various selectors."""
content_element = None
for selector in self.content_selectors:
try:
if selector.startswith('.') or selector.startswith('#'):
elements = soup.select(selector)
else:
elements = soup.find_all(selector)
if elements:
content_element = elements[0]
break
except:
continue
if not content_element:
content_element = soup.find('body')
return content_element
def extract_clean_text(self, soup):
"""Extract and clean text from BeautifulSoup object."""
unwanted_tags = [
"script", "style", "nav", "footer", "header", "aside",
"advertisement", "ads", "popup", "modal", "cookie-notice"
]
for tag in unwanted_tags:
for element in soup.find_all(tag):
element.decompose()
unwanted_classes = [
"sidebar", "menu", "navigation", "nav", "footer", "header",
"advertisement", "ad", "ads", "popup", "modal", "cookie",
"social", "share", "comment", "related", "recommended"
]
for class_name in unwanted_classes:
for element in soup.find_all(class_=re.compile(class_name, re.I)):
element.decompose()
for element in soup.find_all(id=re.compile(class_name, re.I)):
element.decompose()
main_content = self.extract_main_content(soup)
if main_content:
text = main_content.get_text(separator=" ", strip=True)
else:
text = soup.get_text(separator=" ", strip=True)
lines = [line.strip() for line in text.split('\n') if line.strip()]
cleaned_text = '\n'.join(lines)
cleaned_text = re.sub(r'\n\s*\n', '\n\n', cleaned_text)
cleaned_text = re.sub(r' +', ' ', cleaned_text)
return cleaned_text
def scrape_page(self, url):
"""Scrape content from a single page and save as markdown."""
if url in self.visited_links:
return []
self.page_count += 1
self.visited_links.add(url)
status = f"Scraping [{self.page_count}/{self.max_pages}]: {url}"
self.log_message(status)
try:
self.driver.get(url)
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(self.delay)
try:
page_title = self.driver.title or f"Page_{self.page_count}"
except:
page_title = f"Page_{self.page_count}"
soup = BeautifulSoup(self.driver.page_source, "html.parser")
cleaned_text = self.extract_clean_text(soup)
if len(cleaned_text.strip()) < 50:
self.log_message(f"Skipping {url}: insufficient content")
return self.extract_links()
meta_desc = ""
meta_tag = soup.find("meta", attrs={"name": "description"})
if meta_tag:
meta_desc = meta_tag.get("content", "")
doc = {
"text": cleaned_text,
"metadata": {
"source": url,
"title": page_title,
"site_name": self.site_name,
"site_description": self.site_description,
"site_category": self.site_category,
"meta_description": meta_desc,
"domain": self.domain_name,
"scraped_at": datetime.now().isoformat()
}
}
self.documents.append(doc)
safe_filename = self.sanitize_filename(page_title)
file_path = os.path.join(self.output_dir, f"{safe_filename}.md")
counter = 1
original_path = file_path
while os.path.exists(file_path):
base, ext = os.path.splitext(original_path)
file_path = f"{base}_{counter}{ext}"
counter += 1
with open(file_path, "w", encoding="utf-8") as file:
file.write(f"# {page_title}\n\n")
file.write(f"**URL:** {url}\n")
file.write(f"**Site:** {self.site_name}\n")
file.write(f"**Category:** {self.site_category}\n")
if meta_desc:
file.write(f"**Description:** {meta_desc}\n")
file.write(f"**Scraped:** {datetime.now()}\n\n")
file.write("---\n\n")
file.write(cleaned_text)
self.log_message(f"Saved: {os.path.basename(file_path)}")
new_links = self.extract_links()
self.log_message(f"Found {len(new_links)} new links")
return new_links
except Exception as e:
self.log_message(f"Error scraping {url}: {str(e)}")
return []
def create_summary(self):
"""Create a summary of the scraped content."""
summary_path = os.path.join(self.output_dir, "scraping_summary.md")
with open(summary_path, "w", encoding="utf-8") as f:
f.write(f"# Scraping Summary: {self.site_name}\n\n")
f.write(f"**Website:** {self.site_name}\n")
f.write(f"**URL:** {self.base_url}\n")
f.write(f"**Domain:** {self.domain_name}\n")
f.write(f"**Category:** {self.site_category}\n")
f.write(f"**Description:** {self.site_description}\n\n")
f.write(f"**Scraping Details:**\n")
f.write(f"- Start time: {self.start_time}\n")
f.write(f"- End time: {datetime.now()}\n")
f.write(f"- Duration: {datetime.now() - self.start_time}\n")
f.write(f"- Pages scraped: {len(self.documents)}\n")
f.write(f"- Max pages allowed: {self.max_pages}\n")
f.write(f"- Max depth: {self.max_depth}\n")
f.write(f"- External links allowed: {self.scrape_external_links}\n\n")
if self.documents:
f.write("**Scraped Pages:**\n")
for i, doc in enumerate(self.documents, 1):
f.write(f"{i}. [{doc['metadata']['title']}]({doc['metadata']['source']})\n")
def start(self):
"""Start the website scraping process."""
try:
self.log_message(f"Starting website scraping for {self.site_name}")
self.log_message(f"Target: {self.base_url}")
self.log_message(f"Limits: max_depth={self.max_depth}, max_pages={self.max_pages}")
urls_to_scrape = [(self.base_url, 0)]
while urls_to_scrape and self.page_count < self.max_pages:
current_url, current_depth = urls_to_scrape.pop(0)
if current_url in self.visited_links or current_depth > self.max_depth:
continue
new_links = self.scrape_page(current_url)
if current_depth + 1 <= self.max_depth:
for link in new_links:
if link not in self.visited_links:
urls_to_scrape.append((link, current_depth + 1))
self.create_summary()
self.driver.quit()
end_time = datetime.now()
duration = end_time - self.start_time
self.log_message(f"Scraping completed for {self.site_name}")
self.log_message(f"Total pages scraped: {self.page_count}")
self.log_message(f"Duration: {duration}")
return {
"success": True,
"pages_scraped": self.page_count,
"duration": str(duration),
"output_dir": self.output_dir
}
except Exception as e:
self.driver.quit()
self.log_message(f"Scraping failed: {str(e)}")
return {
"success": False,
"error": str(e),
"pages_scraped": self.page_count,
"duration": "0",
"output_dir": self.output_dir
}