cold-email-assistant / scraper.py
ahmednoorx's picture
Update scraper.py
2e93556 verified
import requests
from bs4 import BeautifulSoup
import time
import re
from urllib.parse import urlparse, urljoin
import sqlite3
# Optional Selenium imports for advanced scraping
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
SELENIUM_AVAILABLE = True
except ImportError:
SELENIUM_AVAILABLE = False
print("⚠️ Selenium not available. Company research will use basic scraping only.")
class LinkedInScraper:
def __init__(self, timeout=10, use_selenium=False):
self.timeout = timeout
self.use_selenium = use_selenium
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
if self.use_selenium and SELENIUM_AVAILABLE:
self._setup_selenium()
elif self.use_selenium and not SELENIUM_AVAILABLE:
print("⚠️ Selenium requested but not available. Falling back to basic scraping.")
def _setup_selenium(self):
"""Setup Selenium WebDriver"""
if not SELENIUM_AVAILABLE:
print("⚠️ Selenium not available. Cannot setup WebDriver.")
return
try:
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
self.driver = webdriver.Chrome(
ChromeDriverManager().install(),
options=chrome_options
)
except Exception as e:
print(f"Error setting up Selenium: {e}")
self.use_selenium = False
def _get_cached_data(self, url):
"""Check if URL data is cached in database"""
try:
conn = sqlite3.connect('leads.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS scraped_cache (
url TEXT PRIMARY KEY,
content TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('SELECT content FROM scraped_cache WHERE url = ?', (url,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
print(f"Cache error: {e}")
return None
def _cache_data(self, url, content):
"""Cache scraped data"""
try:
conn = sqlite3.connect('leads.db')
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO scraped_cache (url, content)
VALUES (?, ?)
''', (url, content))
conn.commit()
conn.close()
except Exception as e:
print(f"Cache save error: {e}")
def scrape_with_requests(self, url):
"""Scrape URL using requests and BeautifulSoup"""
try:
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Extract various content types
content_parts = []
# Try to get meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc:
content_parts.append(f"Description: {meta_desc.get('content', '')}")
# Try to get title
title = soup.find('title')
if title:
content_parts.append(f"Title: {title.get_text().strip()}")
# Try to get about section or main content
about_selectors = [
'.about-section',
'.company-description',
'.about-us',
'[class*="about"]',
'.description',
'.summary',
'main',
'.content'
]
for selector in about_selectors:
elements = soup.select(selector)
for element in elements:
text = element.get_text().strip()
if len(text) > 50: # Only meaningful content
content_parts.append(text[:500]) # Limit length
break
if content_parts:
break
# If no specific content found, get paragraphs
if not content_parts:
paragraphs = soup.find_all('p')
for p in paragraphs[:3]: # First 3 paragraphs
text = p.get_text().strip()
if len(text) > 30:
content_parts.append(text[:300])
return ' | '.join(content_parts) if content_parts else "No content extracted"
except Exception as e:
return f"Error scraping {url}: {str(e)}"
def scrape_with_selenium(self, url):
"""Scrape URL using Selenium"""
try:
self.driver.get(url)
WebDriverWait(self.driver, self.timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Wait a bit for dynamic content
time.sleep(2)
content_parts = []
# Try different selectors for LinkedIn
linkedin_selectors = [
'[data-test-id="about-us-description"]',
'.company-about-us-description',
'.about-section',
'[class*="about"]'
]
for selector in linkedin_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for element in elements:
text = element.text.strip()
if len(text) > 50:
content_parts.append(text[:500])
break
except:
continue
# If no LinkedIn-specific content, try general selectors
if not content_parts:
general_selectors = ['main', '.content', 'article', '.description']
for selector in general_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
for element in elements:
text = element.text.strip()
if len(text) > 50:
content_parts.append(text[:500])
break
except:
continue
return ' | '.join(content_parts) if content_parts else "No content extracted"
except Exception as e:
return f"Error scraping {url} with Selenium: {str(e)}"
def scrape_linkedin_profile(self, linkedin_url):
"""Scrape LinkedIn company profile"""
if not linkedin_url or not linkedin_url.strip():
return "No LinkedIn URL provided"
# Check cache first
cached_content = self._get_cached_data(linkedin_url)
if cached_content:
return cached_content
try:
# Clean URL
linkedin_url = linkedin_url.strip()
if not linkedin_url.startswith('http'):
linkedin_url = 'https://' + linkedin_url
# Use appropriate scraping method
if self.use_selenium:
content = self.scrape_with_selenium(linkedin_url)
else:
content = self.scrape_with_requests(linkedin_url)
# Cache the result
self._cache_data(linkedin_url, content)
return content
except Exception as e:
return f"Error accessing LinkedIn: {str(e)}"
def scrape_linkedin_company(self, linkedin_url):
"""Alias for scrape_linkedin_profile - for compatibility"""
return self.scrape_linkedin_profile(linkedin_url)
def scrape_company_data(self, linkedin_url):
"""Another alias for compatibility"""
return self.scrape_linkedin_profile(linkedin_url)
def scrape_company_website(self, company_name):
"""Scrape company website as fallback"""
try:
# Try to construct company website URL
company_clean = re.sub(r'[^\w\s-]', '', company_name.lower())
company_clean = re.sub(r'\s+', '', company_clean)
possible_urls = [
f"https://{company_clean}.com",
f"https://www.{company_clean}.com",
f"https://{company_clean}.org",
f"https://www.{company_clean}.org"
]
for url in possible_urls:
cached_content = self._get_cached_data(url)
if cached_content:
return cached_content
try:
if self.use_selenium:
content = self.scrape_with_selenium(url)
else:
content = self.scrape_with_requests(url)
if "Error" not in content and len(content) > 50:
self._cache_data(url, content)
return content
except:
continue
return f"Could not find website for {company_name}"
except Exception as e:
return f"Error finding company website: {str(e)}"
def scrape_linkedin_or_company(self, linkedin_url, company_name):
"""Main method to scrape LinkedIn or fallback to company website"""
# First try LinkedIn
if linkedin_url and linkedin_url.strip():
linkedin_content = self.scrape_linkedin_profile(linkedin_url)
if "Error" not in linkedin_content and len(linkedin_content) > 50:
return f"LinkedIn: {linkedin_content}"
# Fallback to company website
company_content = self.scrape_company_website(company_name)
return f"Company Website: {company_content}"
def __del__(self):
"""Clean up Selenium driver"""
if hasattr(self, 'driver'):
try:
self.driver.quit()
except:
pass
# Standalone function for easy import
def scrape_company_info(input_data):
"""
Scrape company information from LinkedIn URL or company name
Args:
input_data (str): LinkedIn URL or company name
Returns:
str: Scraped company information or error message if dependencies missing
"""
if not SELENIUM_AVAILABLE:
return "Company research feature requires additional setup. Please install selenium and webdriver-manager for enterprise features."
try:
scraper = LinkedInScraper()
# Check if input is a LinkedIn URL
if 'linkedin.com' in input_data.lower():
result = scraper.scrape_linkedin_or_company(input_data, "")
else:
# Treat as company name
result = scraper.scrape_company_website(input_data)
return result if result else ""
except Exception as e:
print(f"Error in scrape_company_info: {e}")
return ""