import requests from bs4 import BeautifulSoup import time import re from urllib.parse import urlparse, urljoin import sqlite3 from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager class LinkedInScraper: def __init__(self, timeout=10, use_selenium=False): self.timeout = timeout self.use_selenium = use_selenium self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) if self.use_selenium: self._setup_selenium() def _setup_selenium(self): """Setup Selenium WebDriver""" try: chrome_options = Options() chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') self.driver = webdriver.Chrome( ChromeDriverManager().install(), options=chrome_options ) except Exception as e: print(f"Error setting up Selenium: {e}") self.use_selenium = False def _get_cached_data(self, url): """Check if URL data is cached in database""" try: conn = sqlite3.connect('leads.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS scraped_cache ( url TEXT PRIMARY KEY, content TEXT, scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute('SELECT content FROM scraped_cache WHERE url = ?', (url,)) result = cursor.fetchone() conn.close() return result[0] if result else None except Exception as e: print(f"Cache error: {e}") return None def _cache_data(self, url, content): """Cache scraped data""" try: conn = sqlite3.connect('leads.db') cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO scraped_cache (url, content) VALUES (?, ?) ''', (url, content)) conn.commit() conn.close() except Exception as e: print(f"Cache save error: {e}") def scrape_with_requests(self, url): """Scrape URL using requests and BeautifulSoup""" try: response = self.session.get(url, timeout=self.timeout) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Extract various content types content_parts = [] # Try to get meta description meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc: content_parts.append(f"Description: {meta_desc.get('content', '')}") # Try to get title title = soup.find('title') if title: content_parts.append(f"Title: {title.get_text().strip()}") # Try to get about section or main content about_selectors = [ '.about-section', '.company-description', '.about-us', '[class*="about"]', '.description', '.summary', 'main', '.content' ] for selector in about_selectors: elements = soup.select(selector) for element in elements: text = element.get_text().strip() if len(text) > 50: # Only meaningful content content_parts.append(text[:500]) # Limit length break if content_parts: break # If no specific content found, get paragraphs if not content_parts: paragraphs = soup.find_all('p') for p in paragraphs[:3]: # First 3 paragraphs text = p.get_text().strip() if len(text) > 30: content_parts.append(text[:300]) return ' | '.join(content_parts) if content_parts else "No content extracted" except Exception as e: return f"Error scraping {url}: {str(e)}" def scrape_with_selenium(self, url): """Scrape URL using Selenium""" try: self.driver.get(url) WebDriverWait(self.driver, self.timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait a bit for dynamic content time.sleep(2) content_parts = [] # Try different selectors for LinkedIn linkedin_selectors = [ '[data-test-id="about-us-description"]', '.company-about-us-description', '.about-section', '[class*="about"]' ] for selector in linkedin_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: text = element.text.strip() if len(text) > 50: content_parts.append(text[:500]) break except: continue # If no LinkedIn-specific content, try general selectors if not content_parts: general_selectors = ['main', '.content', 'article', '.description'] for selector in general_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: text = element.text.strip() if len(text) > 50: content_parts.append(text[:500]) break except: continue return ' | '.join(content_parts) if content_parts else "No content extracted" except Exception as e: return f"Error scraping {url} with Selenium: {str(e)}" def scrape_linkedin_profile(self, linkedin_url): """Scrape LinkedIn company profile""" if not linkedin_url or not linkedin_url.strip(): return "No LinkedIn URL provided" # Check cache first cached_content = self._get_cached_data(linkedin_url) if cached_content: return cached_content try: # Clean URL linkedin_url = linkedin_url.strip() if not linkedin_url.startswith('http'): linkedin_url = 'https://' + linkedin_url # Use appropriate scraping method if self.use_selenium: content = self.scrape_with_selenium(linkedin_url) else: content = self.scrape_with_requests(linkedin_url) # Cache the result self._cache_data(linkedin_url, content) return content except Exception as e: return f"Error accessing LinkedIn: {str(e)}" def scrape_linkedin_company(self, linkedin_url): """Alias for scrape_linkedin_profile - for compatibility""" return self.scrape_linkedin_profile(linkedin_url) def scrape_company_website(self, company_name): """Scrape company website as fallback""" try: # Try to construct company website URL company_clean = re.sub(r'[^\w\s-]', '', company_name.lower()) company_clean = re.sub(r'\s+', '', company_clean) possible_urls = [ f"https://{company_clean}.com", f"https://www.{company_clean}.com", f"https://{company_clean}.org", f"https://www.{company_clean}.org" ] for url in possible_urls: cached_content = self._get_cached_data(url) if cached_content: return cached_content try: if self.use_selenium: content = self.scrape_with_selenium(url) else: content = self.scrape_with_requests(url) if "Error" not in content and len(content) > 50: self._cache_data(url, content) return content except: continue return f"Could not find website for {company_name}" except Exception as e: return f"Error finding company website: {str(e)}" def scrape_linkedin_or_company(self, linkedin_url, company_name): """Main method to scrape LinkedIn or fallback to company website""" # First try LinkedIn if linkedin_url and linkedin_url.strip(): linkedin_content = self.scrape_linkedin_profile(linkedin_url) if "Error" not in linkedin_content and len(linkedin_content) > 50: return f"LinkedIn: {linkedin_content}" # Fallback to company website company_content = self.scrape_company_website(company_name) return f"Company Website: {company_content}" def __del__(self): """Clean up Selenium driver""" if hasattr(self, 'driver'): try: self.driver.quit() except: pass