Spaces:
Running
Running
import requests | |
from bs4 import BeautifulSoup | |
import time | |
import re | |
from urllib.parse import urlparse, urljoin | |
import sqlite3 | |
# Optional Selenium imports for advanced scraping | |
try: | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from webdriver_manager.chrome import ChromeDriverManager | |
SELENIUM_AVAILABLE = True | |
except ImportError: | |
SELENIUM_AVAILABLE = False | |
print("⚠️ Selenium not available. Company research will use basic scraping only.") | |
class LinkedInScraper: | |
def __init__(self, timeout=10, use_selenium=False): | |
self.timeout = timeout | |
self.use_selenium = use_selenium | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
}) | |
if self.use_selenium and SELENIUM_AVAILABLE: | |
self._setup_selenium() | |
elif self.use_selenium and not SELENIUM_AVAILABLE: | |
print("⚠️ Selenium requested but not available. Falling back to basic scraping.") | |
def _setup_selenium(self): | |
"""Setup Selenium WebDriver""" | |
if not SELENIUM_AVAILABLE: | |
print("⚠️ Selenium not available. Cannot setup WebDriver.") | |
return | |
try: | |
chrome_options = Options() | |
chrome_options.add_argument('--headless') | |
chrome_options.add_argument('--no-sandbox') | |
chrome_options.add_argument('--disable-dev-shm-usage') | |
chrome_options.add_argument('--disable-gpu') | |
chrome_options.add_argument('--window-size=1920,1080') | |
self.driver = webdriver.Chrome( | |
ChromeDriverManager().install(), | |
options=chrome_options | |
) | |
except Exception as e: | |
print(f"Error setting up Selenium: {e}") | |
self.use_selenium = False | |
def _get_cached_data(self, url): | |
"""Check if URL data is cached in database""" | |
try: | |
conn = sqlite3.connect('leads.db') | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS scraped_cache ( | |
url TEXT PRIMARY KEY, | |
content TEXT, | |
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute('SELECT content FROM scraped_cache WHERE url = ?', (url,)) | |
result = cursor.fetchone() | |
conn.close() | |
return result[0] if result else None | |
except Exception as e: | |
print(f"Cache error: {e}") | |
return None | |
def _cache_data(self, url, content): | |
"""Cache scraped data""" | |
try: | |
conn = sqlite3.connect('leads.db') | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT OR REPLACE INTO scraped_cache (url, content) | |
VALUES (?, ?) | |
''', (url, content)) | |
conn.commit() | |
conn.close() | |
except Exception as e: | |
print(f"Cache save error: {e}") | |
def scrape_with_requests(self, url): | |
"""Scrape URL using requests and BeautifulSoup""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Extract various content types | |
content_parts = [] | |
# Try to get meta description | |
meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
if meta_desc: | |
content_parts.append(f"Description: {meta_desc.get('content', '')}") | |
# Try to get title | |
title = soup.find('title') | |
if title: | |
content_parts.append(f"Title: {title.get_text().strip()}") | |
# Try to get about section or main content | |
about_selectors = [ | |
'.about-section', | |
'.company-description', | |
'.about-us', | |
'[class*="about"]', | |
'.description', | |
'.summary', | |
'main', | |
'.content' | |
] | |
for selector in about_selectors: | |
elements = soup.select(selector) | |
for element in elements: | |
text = element.get_text().strip() | |
if len(text) > 50: # Only meaningful content | |
content_parts.append(text[:500]) # Limit length | |
break | |
if content_parts: | |
break | |
# If no specific content found, get paragraphs | |
if not content_parts: | |
paragraphs = soup.find_all('p') | |
for p in paragraphs[:3]: # First 3 paragraphs | |
text = p.get_text().strip() | |
if len(text) > 30: | |
content_parts.append(text[:300]) | |
return ' | '.join(content_parts) if content_parts else "No content extracted" | |
except Exception as e: | |
return f"Error scraping {url}: {str(e)}" | |
def scrape_with_selenium(self, url): | |
"""Scrape URL using Selenium""" | |
try: | |
self.driver.get(url) | |
WebDriverWait(self.driver, self.timeout).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
# Wait a bit for dynamic content | |
time.sleep(2) | |
content_parts = [] | |
# Try different selectors for LinkedIn | |
linkedin_selectors = [ | |
'[data-test-id="about-us-description"]', | |
'.company-about-us-description', | |
'.about-section', | |
'[class*="about"]' | |
] | |
for selector in linkedin_selectors: | |
try: | |
elements = self.driver.find_elements(By.CSS_SELECTOR, selector) | |
for element in elements: | |
text = element.text.strip() | |
if len(text) > 50: | |
content_parts.append(text[:500]) | |
break | |
except: | |
continue | |
# If no LinkedIn-specific content, try general selectors | |
if not content_parts: | |
general_selectors = ['main', '.content', 'article', '.description'] | |
for selector in general_selectors: | |
try: | |
elements = self.driver.find_elements(By.CSS_SELECTOR, selector) | |
for element in elements: | |
text = element.text.strip() | |
if len(text) > 50: | |
content_parts.append(text[:500]) | |
break | |
except: | |
continue | |
return ' | '.join(content_parts) if content_parts else "No content extracted" | |
except Exception as e: | |
return f"Error scraping {url} with Selenium: {str(e)}" | |
def scrape_linkedin_profile(self, linkedin_url): | |
"""Scrape LinkedIn company profile""" | |
if not linkedin_url or not linkedin_url.strip(): | |
return "No LinkedIn URL provided" | |
# Check cache first | |
cached_content = self._get_cached_data(linkedin_url) | |
if cached_content: | |
return cached_content | |
try: | |
# Clean URL | |
linkedin_url = linkedin_url.strip() | |
if not linkedin_url.startswith('http'): | |
linkedin_url = 'https://' + linkedin_url | |
# Use appropriate scraping method | |
if self.use_selenium: | |
content = self.scrape_with_selenium(linkedin_url) | |
else: | |
content = self.scrape_with_requests(linkedin_url) | |
# Cache the result | |
self._cache_data(linkedin_url, content) | |
return content | |
except Exception as e: | |
return f"Error accessing LinkedIn: {str(e)}" | |
def scrape_linkedin_company(self, linkedin_url): | |
"""Alias for scrape_linkedin_profile - for compatibility""" | |
return self.scrape_linkedin_profile(linkedin_url) | |
def scrape_company_data(self, linkedin_url): | |
"""Another alias for compatibility""" | |
return self.scrape_linkedin_profile(linkedin_url) | |
def scrape_company_website(self, company_name): | |
"""Scrape company website as fallback""" | |
try: | |
# Try to construct company website URL | |
company_clean = re.sub(r'[^\w\s-]', '', company_name.lower()) | |
company_clean = re.sub(r'\s+', '', company_clean) | |
possible_urls = [ | |
f"https://{company_clean}.com", | |
f"https://www.{company_clean}.com", | |
f"https://{company_clean}.org", | |
f"https://www.{company_clean}.org" | |
] | |
for url in possible_urls: | |
cached_content = self._get_cached_data(url) | |
if cached_content: | |
return cached_content | |
try: | |
if self.use_selenium: | |
content = self.scrape_with_selenium(url) | |
else: | |
content = self.scrape_with_requests(url) | |
if "Error" not in content and len(content) > 50: | |
self._cache_data(url, content) | |
return content | |
except: | |
continue | |
return f"Could not find website for {company_name}" | |
except Exception as e: | |
return f"Error finding company website: {str(e)}" | |
def scrape_linkedin_or_company(self, linkedin_url, company_name): | |
"""Main method to scrape LinkedIn or fallback to company website""" | |
# First try LinkedIn | |
if linkedin_url and linkedin_url.strip(): | |
linkedin_content = self.scrape_linkedin_profile(linkedin_url) | |
if "Error" not in linkedin_content and len(linkedin_content) > 50: | |
return f"LinkedIn: {linkedin_content}" | |
# Fallback to company website | |
company_content = self.scrape_company_website(company_name) | |
return f"Company Website: {company_content}" | |
def __del__(self): | |
"""Clean up Selenium driver""" | |
if hasattr(self, 'driver'): | |
try: | |
self.driver.quit() | |
except: | |
pass | |
# Standalone function for easy import | |
def scrape_company_info(input_data): | |
""" | |
Scrape company information from LinkedIn URL or company name | |
Args: | |
input_data (str): LinkedIn URL or company name | |
Returns: | |
str: Scraped company information or error message if dependencies missing | |
""" | |
if not SELENIUM_AVAILABLE: | |
return "Company research feature requires additional setup. Please install selenium and webdriver-manager for enterprise features." | |
try: | |
scraper = LinkedInScraper() | |
# Check if input is a LinkedIn URL | |
if 'linkedin.com' in input_data.lower(): | |
result = scraper.scrape_linkedin_or_company(input_data, "") | |
else: | |
# Treat as company name | |
result = scraper.scrape_company_website(input_data) | |
return result if result else "" | |
except Exception as e: | |
print(f"Error in scrape_company_info: {e}") | |
return "" | |