import time import os import re import urllib.parse from datetime import datetime from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup class WebsiteScraper: def __init__(self, base_url, site_name, site_description="", site_category="General", output_dir=None, max_depth=3, max_pages=50, delay=2, headless=True, scrape_external_links=False, content_selectors=None): """ Initialize the website scraper. Args: base_url (str): Starting URL to scrape site_name (str): Name of the website site_description (str): Description of the website site_category (str): Category of the website output_dir (str): Directory to save files (auto-generated if None) max_depth (int): Maximum depth to crawl max_pages (int): Maximum number of pages to scrape delay (float): Delay between requests in seconds headless (bool): Run browser in headless mode scrape_external_links (bool): Whether to follow external links content_selectors (list): CSS selectors to find main content """ parsed_url = urllib.parse.urlparse(base_url) self.base_url = base_url self.base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}" self.domain_name = parsed_url.netloc self.site_name = site_name self.site_description = site_description self.site_category = site_category self.scrape_external_links = scrape_external_links self.content_selectors = content_selectors or [ 'main', 'article', '.content', '#content', '.main-content', '.post-content', '.entry-content', '.page-content', 'body' ] self.max_depth = max_depth self.max_pages = max_pages self.delay = delay self.visited_links = set() self.page_count = 0 self.start_time = datetime.now() # Create output directory if output_dir is None: domain_safe = self.domain_name.replace(".", "_").replace(":", "_") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.output_dir = f"{site_name}_{domain_safe}_{timestamp}" else: self.output_dir = output_dir if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) # Initialize log file self.log_path = os.path.join(self.output_dir, "scraping_log.txt") with open(self.log_path, "w", encoding="utf-8") as log_file: log_file.write(f"Website scraping started at: {self.start_time}\n") log_file.write(f"Website: {self.site_name}\n") log_file.write(f"Description: {self.site_description}\n") log_file.write(f"Category: {self.site_category}\n") log_file.write(f"Base URL: {self.base_url}\n") log_file.write(f"Domain: {self.domain_name}\n") log_file.write(f"Max depth: {self.max_depth}\n") log_file.write(f"Max pages: {self.max_pages}\n") log_file.write(f"External links: {self.scrape_external_links}\n\n") # Initialize driver self.driver = None self.documents = [] # Set up the Chrome driver self.setup_driver(headless) def setup_driver(self, headless=True): """Setup Chrome driver with options for Gradio Spaces compatibility.""" try: chrome_options = Options() # Essential headless options if headless: chrome_options.add_argument("--headless") # Core stability options chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--disable-features=VizDisplayCompositor") chrome_options.add_argument("--disable-web-security") chrome_options.add_argument("--allow-running-insecure-content") # Logging and extension options chrome_options.add_argument("--disable-logging") chrome_options.add_argument("--log-level=3") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-plugins") # Window and display options chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--start-maximized") # User agent chrome_options.add_argument("--user-agent=Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36") # Experimental options chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) chrome_options.add_experimental_option('useAutomationExtension', False) # Memory and performance options chrome_options.add_argument("--memory-pressure-off") chrome_options.add_argument("--max_old_space_size=4096") # Try different Chrome binary locations for different environments chrome_binaries = [ "/usr/bin/chromium", "/usr/bin/chromium-browser", "/usr/bin/google-chrome", "/usr/bin/google-chrome-stable" ] for binary in chrome_binaries: if os.path.exists(binary): chrome_options.binary_location = binary break # Try different driver approaches try: # Try system chromedriver first if os.path.exists("/usr/bin/chromedriver"): from selenium.webdriver.chrome.service import Service service = Service("/usr/bin/chromedriver") self.driver = webdriver.Chrome(service=service, options=chrome_options) else: # Fallback to webdriver-manager try: from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=chrome_options) except: # Last resort - try without service self.driver = webdriver.Chrome(options=chrome_options) except Exception as e: self.log_message(f"Chrome setup failed: {e}") # Try with minimal options as last resort minimal_options = Options() minimal_options.add_argument("--headless") minimal_options.add_argument("--no-sandbox") minimal_options.add_argument("--disable-dev-shm-usage") self.driver = webdriver.Chrome(options=minimal_options) self.log_message("Chrome driver initialized successfully") except Exception as e: self.log_message(f"Error setting up Chrome driver: {e}") raise Exception(f"Failed to initialize Chrome driver: {e}") def log_message(self, message): """Write message to console and log file.""" print(message) try: with open(self.log_path, "a", encoding="utf-8") as log_file: log_file.write(f"{datetime.now()}: {message}\n") except: pass # Continue even if logging fails def is_valid_url(self, url): """Check if URL should be scraped.""" if not url: return False # Check external links if not self.scrape_external_links and not url.startswith(self.base_domain): return False # Skip file extensions if re.search(r"\.(pdf|doc|docx|xls|xlsx|ppt|pptx|jpg|jpeg|png|gif|svg|ico|css|js|xml|json|zip|tar|gz|rar|7z|exe|dmg|mp3|mp4|avi|mov|wmv)$", url, re.IGNORECASE): return False # Handle fragments if "#" in url: url = url.split("#")[0] if url in self.visited_links: return False # Skip common patterns skip_patterns = [ '/login', '/register', '/signup', '/sign-up', '/signin', '/sign-in', '/logout', '/password', '/forgot', '/reset', '/admin', '/dashboard', '/account', '/profile', '/cart', '/checkout', '/payment', '/billing', '/terms', '/privacy', '/legal', '/disclaimer', '/sitemap', '/robots.txt', '/favicon' ] url_lower = url.lower() for pattern in skip_patterns: if pattern in url_lower: return False # Skip spam patterns spam_patterns = ['popup', 'advertisement', 'tracking', 'analytics'] for pattern in spam_patterns: if pattern in url_lower: return False return True def sanitize_filename(self, text): """Convert text to safe filename.""" if not text or len(text.strip()) == 0: return f"page_{self.page_count}" safe_name = re.sub(r'[^\w\s()-]', "_", text) safe_name = re.sub(r'\s+', "_", safe_name) safe_name = safe_name.strip("_") return safe_name[:100] if len(safe_name) > 100 else safe_name def extract_links(self): """Extract valid links from current page.""" links = [] try: link_elements = self.driver.find_elements(By.TAG_NAME, "a") for link in link_elements: try: href = link.get_attribute("href") if href: if href.startswith('/'): href = self.base_domain + href elif href.startswith('./') or not href.startswith('http'): current_url = self.driver.current_url base_path = '/'.join(current_url.split('/')[:-1]) href = base_path + '/' + href.lstrip('./') if self.is_valid_url(href) and href not in self.visited_links: links.append(href) except Exception: continue except Exception as e: self.log_message(f"Error extracting links: {e}") return list(set(links)) def extract_main_content(self, soup): """Extract main content using various selectors.""" content_element = None for selector in self.content_selectors: try: if selector.startswith('.') or selector.startswith('#'): elements = soup.select(selector) else: elements = soup.find_all(selector) if elements: content_element = elements[0] break except: continue if not content_element: content_element = soup.find('body') return content_element def extract_clean_text(self, soup): """Extract and clean text from BeautifulSoup object.""" # Remove unwanted tags unwanted_tags = [ "script", "style", "nav", "footer", "header", "aside", "advertisement", "ads", "popup", "modal", "cookie-notice" ] for tag in unwanted_tags: for element in soup.find_all(tag): element.decompose() # Remove unwanted classes and IDs unwanted_classes = [ "sidebar", "menu", "navigation", "nav", "footer", "header", "advertisement", "ad", "ads", "popup", "modal", "cookie", "social", "share", "comment", "related", "recommended" ] for class_name in unwanted_classes: for element in soup.find_all(class_=re.compile(class_name, re.I)): element.decompose() for element in soup.find_all(id=re.compile(class_name, re.I)): element.decompose() # Extract main content main_content = self.extract_main_content(soup) if main_content: text = main_content.get_text(separator=" ", strip=True) else: text = soup.get_text(separator=" ", strip=True) # Clean up text lines = [line.strip() for line in text.split('\n') if line.strip()] cleaned_text = '\n'.join(lines) cleaned_text = re.sub(r'\n\s*\n', '\n\n', cleaned_text) cleaned_text = re.sub(r' +', ' ', cleaned_text) return cleaned_text def scrape_page(self, url): """Scrape content from a single page and save as markdown.""" if url in self.visited_links: return [] self.page_count += 1 self.visited_links.add(url) status = f"Scraping [{self.page_count}/{self.max_pages}]: {url}" self.log_message(status) try: self.driver.get(url) WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) time.sleep(self.delay) # Get page title try: page_title = self.driver.title or f"Page_{self.page_count}" except: page_title = f"Page_{self.page_count}" # Parse page content soup = BeautifulSoup(self.driver.page_source, "html.parser") cleaned_text = self.extract_clean_text(soup) # Skip if insufficient content if len(cleaned_text.strip()) < 50: self.log_message(f"Skipping {url}: insufficient content") return self.extract_links() # Extract meta description meta_desc = "" meta_tag = soup.find("meta", attrs={"name": "description"}) if meta_tag: meta_desc = meta_tag.get("content", "") # Create document doc = { "text": cleaned_text, "metadata": { "source": url, "title": page_title, "site_name": self.site_name, "site_description": self.site_description, "site_category": self.site_category, "meta_description": meta_desc, "domain": self.domain_name, "scraped_at": datetime.now().isoformat() } } self.documents.append(doc) # Save to file safe_filename = self.sanitize_filename(page_title) file_path = os.path.join(self.output_dir, f"{safe_filename}.md") # Handle duplicate filenames counter = 1 original_path = file_path while os.path.exists(file_path): base, ext = os.path.splitext(original_path) file_path = f"{base}_{counter}{ext}" counter += 1 # Write markdown file with open(file_path, "w", encoding="utf-8") as file: file.write(f"# {page_title}\n\n") file.write(f"**URL:** {url}\n") file.write(f"**Site:** {self.site_name}\n") file.write(f"**Category:** {self.site_category}\n") if meta_desc: file.write(f"**Description:** {meta_desc}\n") file.write(f"**Scraped:** {datetime.now()}\n\n") file.write("---\n\n") file.write(cleaned_text) self.log_message(f"Saved: {os.path.basename(file_path)}") # Extract new links new_links = self.extract_links() self.log_message(f"Found {len(new_links)} new links") return new_links except Exception as e: self.log_message(f"Error scraping {url}: {str(e)}") return [] def create_summary(self): """Create a summary of the scraped content.""" summary_path = os.path.join(self.output_dir, "scraping_summary.md") try: with open(summary_path, "w", encoding="utf-8") as f: f.write(f"# Scraping Summary: {self.site_name}\n\n") f.write(f"**Website:** {self.site_name}\n") f.write(f"**URL:** {self.base_url}\n") f.write(f"**Domain:** {self.domain_name}\n") f.write(f"**Category:** {self.site_category}\n") f.write(f"**Description:** {self.site_description}\n\n") f.write(f"**Scraping Details:**\n") f.write(f"- Start time: {self.start_time}\n") f.write(f"- End time: {datetime.now()}\n") f.write(f"- Duration: {datetime.now() - self.start_time}\n") f.write(f"- Pages scraped: {len(self.documents)}\n") f.write(f"- Max pages allowed: {self.max_pages}\n") f.write(f"- Max depth: {self.max_depth}\n") f.write(f"- External links allowed: {self.scrape_external_links}\n\n") if self.documents: f.write("**Scraped Pages:**\n") for i, doc in enumerate(self.documents, 1): f.write(f"{i}. [{doc['metadata']['title']}]({doc['metadata']['source']})\n") except Exception as e: self.log_message(f"Error creating summary: {e}") def start(self): """Start the website scraping process.""" try: self.log_message(f"Starting website scraping for {self.site_name}") self.log_message(f"Target: {self.base_url}") self.log_message(f"Limits: max_depth={self.max_depth}, max_pages={self.max_pages}") urls_to_scrape = [(self.base_url, 0)] while urls_to_scrape and self.page_count < self.max_pages: current_url, current_depth = urls_to_scrape.pop(0) if current_url in self.visited_links or current_depth > self.max_depth: continue new_links = self.scrape_page(current_url) # Add new links for next depth level if current_depth + 1 <= self.max_depth: for link in new_links: if link not in self.visited_links: urls_to_scrape.append((link, current_depth + 1)) # Create summary self.create_summary() # Clean up if self.driver: self.driver.quit() end_time = datetime.now() duration = end_time - self.start_time self.log_message(f"Scraping completed for {self.site_name}") self.log_message(f"Total pages scraped: {self.page_count}") self.log_message(f"Duration: {duration}") return { "success": True, "pages_scraped": self.page_count, "duration": str(duration), "output_dir": self.output_dir } except Exception as e: if self.driver: self.driver.quit() self.log_message(f"Scraping failed: {str(e)}") return { "success": False, "error": str(e), "pages_scraped": self.page_count, "duration": "0", "output_dir": self.output_dir }