import time import os import re import urllib.parse from datetime import datetime # from selenium import webdriver # from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup try: from selenium import webdriver from selenium.webdriver.chrome.options import Options SELENIUM_AVAILABLE = True except ImportError: SELENIUM_AVAILABLE = False print("Selenium not available. Some features may not work.") class WebsiteScraper: def __init__(self, base_url, site_name, site_description="", site_category="General", output_dir=None, max_depth=3, max_pages=50, delay=2, headless=True, scrape_external_links=False, content_selectors=None): """ Initialize the website scraper. Args: base_url (str): Starting URL to scrape site_name (str): Name of the website site_description (str): Description of the website site_category (str): Category of the website output_dir (str): Directory to save files (auto-generated if None) max_depth (int): Maximum depth to crawl max_pages (int): Maximum number of pages to scrape delay (float): Delay between requests in seconds headless (bool): Run browser in headless mode scrape_external_links (bool): Whether to follow external links content_selectors (list): CSS selectors to find main content """ parsed_url = urllib.parse.urlparse(base_url) self.base_url = base_url self.base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}" self.domain_name = parsed_url.netloc self.site_name = site_name self.site_description = site_description self.site_category = site_category self.scrape_external_links = scrape_external_links self.content_selectors = content_selectors or [ 'main', 'article', '.content', '#content', '.main-content', '.post-content', '.entry-content', '.page-content', 'body' ] self.max_depth = max_depth self.max_pages = max_pages self.delay = delay self.visited_links = set() self.page_count = 0 self.start_time = datetime.now() if output_dir is None: domain_safe = self.domain_name.replace(".", "_").replace(":", "_") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.output_dir = f"{site_name}_{domain_safe}_{timestamp}" else: self.output_dir = output_dir if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) self.log_path = os.path.join(self.output_dir, "scraping_log.txt") with open(self.log_path, "w", encoding="utf-8") as log_file: log_file.write(f"Website scraping started at: {self.start_time}\n") log_file.write(f"Website: {self.site_name}\n") log_file.write(f"Description: {self.site_description}\n") log_file.write(f"Category: {self.site_category}\n") log_file.write(f"Base URL: {self.base_url}\n") log_file.write(f"Domain: {self.domain_name}\n") log_file.write(f"Max depth: {self.max_depth}\n") log_file.write(f"Max pages: {self.max_pages}\n") log_file.write(f"External links: {self.scrape_external_links}\n\n") self.setup_driver(headless) self.documents = [] def setup_driver(self, headless): """Setup Chrome driver with options.""" try: chrome_options = Options() if headless: chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-logging") chrome_options.add_argument("--log-level=3") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--disable-web-security") chrome_options.add_argument("--allow-running-insecure-content") chrome_options.add_argument("--disable-features=VizDisplayCompositor") chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") chrome_options.binary_location = "/usr/bin/chromium" chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) chrome_options.add_experimental_option('useAutomationExtension', False) try: self.driver = webdriver.Chrome( executable_path="/usr/bin/chromedriver", options=chrome_options ) except: from webdriver_manager.chrome import ChromeDriverManager self.driver = webdriver.Chrome( ChromeDriverManager().install(), options=chrome_options ) self.log_message("Chrome driver initialized successfully") except Exception as e: self.log_message(f"Error setting up Chrome driver: {e}") raise def log_message(self, message): """Write message to console and log file.""" print(message) with open(self.log_path, "a", encoding="utf-8") as log_file: log_file.write(f"{message}\n") def is_valid_url(self, url): """Check if URL should be scraped.""" if not self.scrape_external_links and not url.startswith(self.base_domain): return False if re.search(r"\.(pdf|doc|docx|xls|xlsx|ppt|pptx|jpg|jpeg|png|gif|svg|ico|css|js|xml|json|zip|tar|gz|rar|7z|exe|dmg|mp3|mp4|avi|mov|wmv)$", url, re.IGNORECASE): return False if "#" in url: url = url.split("#")[0] if url in self.visited_links: return False skip_patterns = [ '/login', '/register', '/signup', '/sign-up', '/signin', '/sign-in', '/logout', '/password', '/forgot', '/reset', '/admin', '/dashboard', '/account', '/profile', '/cart', '/checkout', '/payment', '/billing', '/terms', '/privacy', '/legal', '/disclaimer', '/sitemap', '/robots.txt', '/favicon' ] url_lower = url.lower() for pattern in skip_patterns: if pattern in url_lower: return False spam_patterns = ['popup', 'advertisement', 'tracking', 'analytics'] for pattern in spam_patterns: if pattern in url_lower: return False return True def sanitize_filename(self, text): """Convert text to safe filename.""" if not text or len(text.strip()) == 0: return f"page_{self.page_count}" safe_name = re.sub(r'[^\w\s()-]', "_", text) safe_name = re.sub(r'\s+', "_", safe_name) safe_name = safe_name.strip("_") return safe_name[:100] if len(safe_name) > 100 else safe_name def extract_links(self): """Extract valid links from current page.""" links = self.driver.find_elements(By.TAG_NAME, "a") valid_links = [] for link in links: try: href = link.get_attribute("href") if href: if href.startswith('/'): href = self.base_domain + href elif href.startswith('./') or not href.startswith('http'): current_url = self.driver.current_url base_path = '/'.join(current_url.split('/')[:-1]) href = base_path + '/' + href.lstrip('./') if self.is_valid_url(href) and href not in self.visited_links: valid_links.append(href) except Exception: continue return list(set(valid_links)) def extract_main_content(self, soup): """Extract main content using various selectors.""" content_element = None for selector in self.content_selectors: try: if selector.startswith('.') or selector.startswith('#'): elements = soup.select(selector) else: elements = soup.find_all(selector) if elements: content_element = elements[0] break except: continue if not content_element: content_element = soup.find('body') return content_element def extract_clean_text(self, soup): """Extract and clean text from BeautifulSoup object.""" unwanted_tags = [ "script", "style", "nav", "footer", "header", "aside", "advertisement", "ads", "popup", "modal", "cookie-notice" ] for tag in unwanted_tags: for element in soup.find_all(tag): element.decompose() unwanted_classes = [ "sidebar", "menu", "navigation", "nav", "footer", "header", "advertisement", "ad", "ads", "popup", "modal", "cookie", "social", "share", "comment", "related", "recommended" ] for class_name in unwanted_classes: for element in soup.find_all(class_=re.compile(class_name, re.I)): element.decompose() for element in soup.find_all(id=re.compile(class_name, re.I)): element.decompose() main_content = self.extract_main_content(soup) if main_content: text = main_content.get_text(separator=" ", strip=True) else: text = soup.get_text(separator=" ", strip=True) lines = [line.strip() for line in text.split('\n') if line.strip()] cleaned_text = '\n'.join(lines) cleaned_text = re.sub(r'\n\s*\n', '\n\n', cleaned_text) cleaned_text = re.sub(r' +', ' ', cleaned_text) return cleaned_text def scrape_page(self, url): """Scrape content from a single page and save as markdown.""" if url in self.visited_links: return [] self.page_count += 1 self.visited_links.add(url) status = f"Scraping [{self.page_count}/{self.max_pages}]: {url}" self.log_message(status) try: self.driver.get(url) WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body"))) time.sleep(self.delay) try: page_title = self.driver.title or f"Page_{self.page_count}" except: page_title = f"Page_{self.page_count}" soup = BeautifulSoup(self.driver.page_source, "html.parser") cleaned_text = self.extract_clean_text(soup) if len(cleaned_text.strip()) < 50: self.log_message(f"Skipping {url}: insufficient content") return self.extract_links() meta_desc = "" meta_tag = soup.find("meta", attrs={"name": "description"}) if meta_tag: meta_desc = meta_tag.get("content", "") doc = { "text": cleaned_text, "metadata": { "source": url, "title": page_title, "site_name": self.site_name, "site_description": self.site_description, "site_category": self.site_category, "meta_description": meta_desc, "domain": self.domain_name, "scraped_at": datetime.now().isoformat() } } self.documents.append(doc) safe_filename = self.sanitize_filename(page_title) file_path = os.path.join(self.output_dir, f"{safe_filename}.md") counter = 1 original_path = file_path while os.path.exists(file_path): base, ext = os.path.splitext(original_path) file_path = f"{base}_{counter}{ext}" counter += 1 with open(file_path, "w", encoding="utf-8") as file: file.write(f"# {page_title}\n\n") file.write(f"**URL:** {url}\n") file.write(f"**Site:** {self.site_name}\n") file.write(f"**Category:** {self.site_category}\n") if meta_desc: file.write(f"**Description:** {meta_desc}\n") file.write(f"**Scraped:** {datetime.now()}\n\n") file.write("---\n\n") file.write(cleaned_text) self.log_message(f"Saved: {os.path.basename(file_path)}") new_links = self.extract_links() self.log_message(f"Found {len(new_links)} new links") return new_links except Exception as e: self.log_message(f"Error scraping {url}: {str(e)}") return [] def create_summary(self): """Create a summary of the scraped content.""" summary_path = os.path.join(self.output_dir, "scraping_summary.md") with open(summary_path, "w", encoding="utf-8") as f: f.write(f"# Scraping Summary: {self.site_name}\n\n") f.write(f"**Website:** {self.site_name}\n") f.write(f"**URL:** {self.base_url}\n") f.write(f"**Domain:** {self.domain_name}\n") f.write(f"**Category:** {self.site_category}\n") f.write(f"**Description:** {self.site_description}\n\n") f.write(f"**Scraping Details:**\n") f.write(f"- Start time: {self.start_time}\n") f.write(f"- End time: {datetime.now()}\n") f.write(f"- Duration: {datetime.now() - self.start_time}\n") f.write(f"- Pages scraped: {len(self.documents)}\n") f.write(f"- Max pages allowed: {self.max_pages}\n") f.write(f"- Max depth: {self.max_depth}\n") f.write(f"- External links allowed: {self.scrape_external_links}\n\n") if self.documents: f.write("**Scraped Pages:**\n") for i, doc in enumerate(self.documents, 1): f.write(f"{i}. [{doc['metadata']['title']}]({doc['metadata']['source']})\n") def start(self): """Start the website scraping process.""" try: self.log_message(f"Starting website scraping for {self.site_name}") self.log_message(f"Target: {self.base_url}") self.log_message(f"Limits: max_depth={self.max_depth}, max_pages={self.max_pages}") urls_to_scrape = [(self.base_url, 0)] while urls_to_scrape and self.page_count < self.max_pages: current_url, current_depth = urls_to_scrape.pop(0) if current_url in self.visited_links or current_depth > self.max_depth: continue new_links = self.scrape_page(current_url) if current_depth + 1 <= self.max_depth: for link in new_links: if link not in self.visited_links: urls_to_scrape.append((link, current_depth + 1)) self.create_summary() self.driver.quit() end_time = datetime.now() duration = end_time - self.start_time self.log_message(f"Scraping completed for {self.site_name}") self.log_message(f"Total pages scraped: {self.page_count}") self.log_message(f"Duration: {duration}") return { "success": True, "pages_scraped": self.page_count, "duration": str(duration), "output_dir": self.output_dir } except Exception as e: self.driver.quit() self.log_message(f"Scraping failed: {str(e)}") return { "success": False, "error": str(e), "pages_scraped": self.page_count, "duration": "0", "output_dir": self.output_dir }