import requests from bs4 import BeautifulSoup import time import re from urllib.parse import urlparse, urljoin import sqlite3 # Optional Selenium imports for advanced scraping try: from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager SELENIUM_AVAILABLE = True except ImportError: SELENIUM_AVAILABLE = False print("⚠️ Selenium not available. Company research will use basic scraping only.") class LinkedInScraper: def __init__(self, timeout=10, use_selenium=False): self.timeout = timeout self.use_selenium = use_selenium self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) if self.use_selenium and SELENIUM_AVAILABLE: self._setup_selenium() elif self.use_selenium and not SELENIUM_AVAILABLE: print("⚠️ Selenium requested but not available. Falling back to basic scraping.") def _setup_selenium(self): """Setup Selenium WebDriver""" if not SELENIUM_AVAILABLE: print("⚠️ Selenium not available. Cannot setup WebDriver.") return try: chrome_options = Options() chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') self.driver = webdriver.Chrome( ChromeDriverManager().install(), options=chrome_options ) except Exception as e: print(f"Error setting up Selenium: {e}") self.use_selenium = False def _get_cached_data(self, url): """Check if URL data is cached in database""" try: conn = sqlite3.connect('leads.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS scraped_cache ( url TEXT PRIMARY KEY, content TEXT, scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute('SELECT content FROM scraped_cache WHERE url = ?', (url,)) result = cursor.fetchone() conn.close() return result[0] if result else None except Exception as e: print(f"Cache error: {e}") return None def _cache_data(self, url, content): """Cache scraped data""" try: conn = sqlite3.connect('leads.db') cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO scraped_cache (url, content) VALUES (?, ?) ''', (url, content)) conn.commit() conn.close() except Exception as e: print(f"Cache save error: {e}") def scrape_with_requests(self, url): """Scrape URL using requests and BeautifulSoup""" try: response = self.session.get(url, timeout=self.timeout) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Extract various content types content_parts = [] # Try to get meta description meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc: content_parts.append(f"Description: {meta_desc.get('content', '')}") # Try to get title title = soup.find('title') if title: content_parts.append(f"Title: {title.get_text().strip()}") # Try to get about section or main content about_selectors = [ '.about-section', '.company-description', '.about-us', '[class*="about"]', '.description', '.summary', 'main', '.content' ] for selector in about_selectors: elements = soup.select(selector) for element in elements: text = element.get_text().strip() if len(text) > 50: # Only meaningful content content_parts.append(text[:500]) # Limit length break if content_parts: break # If no specific content found, get paragraphs if not content_parts: paragraphs = soup.find_all('p') for p in paragraphs[:3]: # First 3 paragraphs text = p.get_text().strip() if len(text) > 30: content_parts.append(text[:300]) return ' | '.join(content_parts) if content_parts else "No content extracted" except Exception as e: return f"Error scraping {url}: {str(e)}" def scrape_with_selenium(self, url): """Scrape URL using Selenium""" try: self.driver.get(url) WebDriverWait(self.driver, self.timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait a bit for dynamic content time.sleep(2) content_parts = [] # Try different selectors for LinkedIn linkedin_selectors = [ '[data-test-id="about-us-description"]', '.company-about-us-description', '.about-section', '[class*="about"]' ] for selector in linkedin_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: text = element.text.strip() if len(text) > 50: content_parts.append(text[:500]) break except: continue # If no LinkedIn-specific content, try general selectors if not content_parts: general_selectors = ['main', '.content', 'article', '.description'] for selector in general_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: text = element.text.strip() if len(text) > 50: content_parts.append(text[:500]) break except: continue return ' | '.join(content_parts) if content_parts else "No content extracted" except Exception as e: return f"Error scraping {url} with Selenium: {str(e)}" def scrape_linkedin_profile(self, linkedin_url): """Scrape LinkedIn company profile""" if not linkedin_url or not linkedin_url.strip(): return "No LinkedIn URL provided" # Check cache first cached_content = self._get_cached_data(linkedin_url) if cached_content: return cached_content try: # Clean URL linkedin_url = linkedin_url.strip() if not linkedin_url.startswith('http'): linkedin_url = 'https://' + linkedin_url # Use appropriate scraping method if self.use_selenium: content = self.scrape_with_selenium(linkedin_url) else: content = self.scrape_with_requests(linkedin_url) # Cache the result self._cache_data(linkedin_url, content) return content except Exception as e: return f"Error accessing LinkedIn: {str(e)}" def scrape_linkedin_company(self, linkedin_url): """Alias for scrape_linkedin_profile - for compatibility""" return self.scrape_linkedin_profile(linkedin_url) def scrape_company_data(self, linkedin_url): """Another alias for compatibility""" return self.scrape_linkedin_profile(linkedin_url) def scrape_company_website(self, company_name): """Scrape company website as fallback""" try: # Try to construct company website URL company_clean = re.sub(r'[^\w\s-]', '', company_name.lower()) company_clean = re.sub(r'\s+', '', company_clean) possible_urls = [ f"https://{company_clean}.com", f"https://www.{company_clean}.com", f"https://{company_clean}.org", f"https://www.{company_clean}.org" ] for url in possible_urls: cached_content = self._get_cached_data(url) if cached_content: return cached_content try: if self.use_selenium: content = self.scrape_with_selenium(url) else: content = self.scrape_with_requests(url) if "Error" not in content and len(content) > 50: self._cache_data(url, content) return content except: continue return f"Could not find website for {company_name}" except Exception as e: return f"Error finding company website: {str(e)}" def scrape_linkedin_or_company(self, linkedin_url, company_name): """Main method to scrape LinkedIn or fallback to company website""" # First try LinkedIn if linkedin_url and linkedin_url.strip(): linkedin_content = self.scrape_linkedin_profile(linkedin_url) if "Error" not in linkedin_content and len(linkedin_content) > 50: return f"LinkedIn: {linkedin_content}" # Fallback to company website company_content = self.scrape_company_website(company_name) return f"Company Website: {company_content}" def __del__(self): """Clean up Selenium driver""" if hasattr(self, 'driver'): try: self.driver.quit() except: pass # Standalone function for easy import def scrape_company_info(input_data): """ Scrape company information from LinkedIn URL or company name Args: input_data (str): LinkedIn URL or company name Returns: str: Scraped company information or error message if dependencies missing """ if not SELENIUM_AVAILABLE: return "Company research feature requires additional setup. Please install selenium and webdriver-manager for enterprise features." try: scraper = LinkedInScraper() # Check if input is a LinkedIn URL if 'linkedin.com' in input_data.lower(): result = scraper.scrape_linkedin_or_company(input_data, "") else: # Treat as company name result = scraper.scrape_company_website(input_data) return result if result else "" except Exception as e: print(f"Error in scrape_company_info: {e}") return ""