Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Enhanced Email Scraper with Intelligent Caching | |
""" | |
import os | |
import imaplib | |
import json | |
from email import message_from_bytes | |
from bs4 import BeautifulSoup | |
from datetime import datetime, timedelta | |
from dotenv import load_dotenv | |
from zoneinfo import ZoneInfo | |
from email.utils import parsedate_to_datetime | |
from typing import List, Dict | |
load_dotenv() | |
# Email credentials | |
APP_PASSWORD = os.getenv("APP_PASSWORD") | |
EMAIL_ID = os.getenv("EMAIL_ID") | |
print("EMAIL_ID: ", EMAIL_ID) | |
EMAIL_DB_FILE = "email_db.json" | |
def validate_email_setup(): | |
"""Validate email setup and credentials""" | |
print("=== Email Setup Validation ===") | |
# Check .env file existence | |
env_file_exists = os.path.exists('.env') | |
print(f".env file exists: {'β Yes' if env_file_exists else 'β No'}") | |
if not env_file_exists: | |
print("β No .env file found! Create one with:") | |
print(" [email protected]") | |
print(" APP_PASSWORD=your_16_char_app_password") | |
print(" OPENAI_API_KEY=your_openai_key") | |
return False | |
# Check environment variables | |
issues = [] | |
if not EMAIL_ID: | |
issues.append("EMAIL_ID not set or empty") | |
elif '@' not in EMAIL_ID: | |
issues.append("EMAIL_ID doesn't look like an email address") | |
elif not EMAIL_ID.endswith('@gmail.com'): | |
issues.append("EMAIL_ID should be a Gmail address (@gmail.com)") | |
if not APP_PASSWORD: | |
issues.append("APP_PASSWORD not set or empty") | |
elif len(APP_PASSWORD) != 16: | |
issues.append(f"APP_PASSWORD should be 16 characters, got {len(APP_PASSWORD)}") | |
elif ' ' in APP_PASSWORD: | |
issues.append("APP_PASSWORD should not contain spaces (remove spaces from app password)") | |
if not os.getenv("OPENAI_API_KEY"): | |
issues.append("OPENAI_API_KEY not set (needed for query processing)") | |
if issues: | |
print("β Issues found:") | |
for issue in issues: | |
print(f" - {issue}") | |
return False | |
else: | |
print("β All credentials look good!") | |
return True | |
def _imap_connect(): | |
"""Connect to Gmail IMAP server""" | |
print("=== IMAP Connection Debug ===") | |
# Check if environment variables are loaded | |
print(f"EMAIL_ID loaded: {'β Yes' if EMAIL_ID else 'β No (None/Empty)'}") | |
print(f"APP_PASSWORD loaded: {'β Yes' if APP_PASSWORD else 'β No (None/Empty)'}") | |
if EMAIL_ID: | |
print(f"Email ID: {EMAIL_ID[:5]}...@{EMAIL_ID.split('@')[1] if '@' in EMAIL_ID else 'INVALID'}") | |
if APP_PASSWORD: | |
print(f"App Password length: {len(APP_PASSWORD)} characters") | |
print(f"App Password format: {'β Looks correct (16 chars)' if len(APP_PASSWORD) == 16 else f'β Expected 16 chars, got {len(APP_PASSWORD)}'}") | |
if not EMAIL_ID or not APP_PASSWORD: | |
error_msg = "Missing credentials in environment variables!" | |
print(f"β {error_msg}") | |
raise Exception(error_msg) | |
try: | |
print("π Attempting IMAP SSL connection to imap.gmail.com:993...") | |
mail = imaplib.IMAP4_SSL("imap.gmail.com") | |
print("β SSL connection established") | |
print("π Attempting login...") | |
result = mail.login(EMAIL_ID, APP_PASSWORD) | |
print(f"β Login successful: {result}") | |
print("π Selecting mailbox: [Gmail]/All Mail...") | |
result = mail.select('"[Gmail]/All Mail"') | |
print(f"β Mailbox selected: {result}") | |
print("=== IMAP Connection Successful ===") | |
return mail | |
except imaplib.IMAP4.error as e: | |
print(f"β IMAP Error: {e}") | |
print("π‘ Possible causes:") | |
print(" - App Password is incorrect or expired") | |
print(" - 2FA not enabled on Gmail account") | |
print(" - IMAP access not enabled in Gmail settings") | |
print(" - Gmail account locked or requires security verification") | |
raise | |
except Exception as e: | |
print(f"β Connection Error: {e}") | |
print("π‘ Possible causes:") | |
print(" - Network connectivity issues") | |
print(" - Gmail IMAP server temporarily unavailable") | |
print(" - Firewall blocking IMAP port 993") | |
raise | |
def _email_to_clean_text(msg): | |
"""Extract clean text from email message""" | |
# Try HTML first | |
html_content = None | |
text_content = None | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
content_type = part.get_content_type() | |
if content_type == "text/html": | |
try: | |
html_content = part.get_payload(decode=True).decode(errors="ignore") | |
except: | |
continue | |
elif content_type == "text/plain": | |
try: | |
text_content = part.get_payload(decode=True).decode(errors="ignore") | |
except: | |
continue | |
else: | |
# Non-multipart message | |
content_type = msg.get_content_type() | |
try: | |
content = msg.get_payload(decode=True).decode(errors="ignore") | |
if content_type == "text/html": | |
html_content = content | |
else: | |
text_content = content | |
except: | |
pass | |
# Clean HTML content | |
if html_content: | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
return soup.get_text(separator=' ', strip=True) | |
elif text_content: | |
return text_content.strip() | |
else: | |
return "" | |
def _load_email_db() -> Dict: | |
"""Load email database from file""" | |
if not os.path.exists(EMAIL_DB_FILE): | |
return {} | |
try: | |
with open(EMAIL_DB_FILE, "r") as f: | |
return json.load(f) | |
except (json.JSONDecodeError, IOError): | |
print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") | |
return {} | |
def _save_email_db(db: Dict): | |
"""Save email database to file""" | |
try: | |
with open(EMAIL_DB_FILE, "w") as f: | |
json.dump(db, f, indent=2) | |
except IOError as e: | |
print(f"Error saving database: {e}") | |
raise | |
def _date_to_imap_format(date_str: str) -> str: | |
"""Convert DD-MMM-YYYY to IMAP date format""" | |
try: | |
dt = datetime.strptime(date_str, "%d-%b-%Y") | |
return dt.strftime("%d-%b-%Y") | |
except ValueError: | |
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") | |
def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: | |
"""Check if email date is within the specified range""" | |
try: | |
email_dt = datetime.strptime(email_date, "%d-%b-%Y") | |
start_dt = datetime.strptime(start_date, "%d-%b-%Y") | |
end_dt = datetime.strptime(end_date, "%d-%b-%Y") | |
return start_dt <= email_dt <= end_dt | |
except ValueError: | |
return False | |
def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]: | |
""" | |
Scrape emails from specific sender within date range | |
Uses intelligent caching to avoid re-scraping | |
""" | |
print(f"Scraping emails from {sender_email} between {start_date} and {end_date}") | |
# Load existing database | |
db = _load_email_db() | |
sender_email = sender_email.lower().strip() | |
# Check if we have cached emails for this sender | |
if sender_email in db: | |
cached_emails = db[sender_email].get("emails", []) | |
# Filter cached emails by date range | |
filtered_emails = [ | |
email for email in cached_emails | |
if _is_date_in_range(email["date"], start_date, end_date) | |
] | |
# Check if we need to scrape more recent emails | |
last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020") | |
today = datetime.today().strftime("%d-%b-%Y") | |
if last_scraped == today and filtered_emails: | |
print(f"Using cached emails (last scraped: {last_scraped})") | |
return filtered_emails | |
# Need to scrape emails | |
try: | |
mail = _imap_connect() | |
# Prepare IMAP search criteria | |
start_imap = _date_to_imap_format(start_date) | |
# Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) | |
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) | |
end_imap = end_dt.strftime("%d-%b-%Y") | |
search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"' | |
print(f"IMAP search: {search_criteria}") | |
# Search for emails | |
status, data = mail.search(None, search_criteria) | |
if status != 'OK': | |
raise Exception(f"IMAP search failed: {status}") | |
email_ids = data[0].split() | |
print(f"Found {len(email_ids)} emails") | |
scraped_emails = [] | |
# Process each email | |
for i, email_id in enumerate(email_ids): | |
try: | |
print(f"Processing email {i+1}/{len(email_ids)}") | |
# Fetch email | |
status, msg_data = mail.fetch(email_id, "(RFC822)") | |
if status != 'OK': | |
continue | |
# Parse email | |
msg = message_from_bytes(msg_data[0][1]) | |
# Extract information | |
subject = msg.get("Subject", "No Subject") | |
content = _email_to_clean_text(msg) | |
# Parse date | |
date_header = msg.get("Date", "") | |
if date_header: | |
try: | |
dt_obj = parsedate_to_datetime(date_header) | |
# Convert to IST | |
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) | |
email_date = ist_dt.strftime("%d-%b-%Y") | |
email_time = ist_dt.strftime("%H:%M:%S") | |
except: | |
email_date = datetime.today().strftime("%d-%b-%Y") | |
email_time = "00:00:00" | |
else: | |
email_date = datetime.today().strftime("%d-%b-%Y") | |
email_time = "00:00:00" | |
# Get message ID for deduplication | |
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") | |
scraped_emails.append({ | |
"date": email_date, | |
"time": email_time, | |
"subject": subject, | |
"content": content[:2000], # Limit content length | |
"message_id": message_id | |
}) | |
except Exception as e: | |
print(f"Error processing email {email_id}: {e}") | |
continue | |
mail.logout() | |
# Update database | |
if sender_email not in db: | |
db[sender_email] = {"emails": [], "last_scraped": ""} | |
# Merge with existing emails (avoid duplicates) | |
existing_emails = db[sender_email].get("emails", []) | |
existing_ids = {email.get("message_id") for email in existing_emails} | |
new_emails = [ | |
email for email in scraped_emails | |
if email["message_id"] not in existing_ids | |
] | |
# Update database | |
db[sender_email]["emails"] = existing_emails + new_emails | |
db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y") | |
# Save database | |
_save_email_db(db) | |
# Return filtered results | |
all_emails = db[sender_email]["emails"] | |
filtered_emails = [ | |
email for email in all_emails | |
if _is_date_in_range(email["date"], start_date, end_date) | |
] | |
print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range") | |
return filtered_emails | |
except Exception as e: | |
print(f"Email scraping failed: {e}") | |
raise | |
def scrape_emails_by_text_search(keyword: str, start_date: str, end_date: str) -> List[Dict]: | |
""" | |
Scrape emails containing a specific keyword (like company name) within date range. | |
Uses IMAP text search to find emails from senders containing the keyword. | |
""" | |
print(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") | |
# Validate setup first | |
if not validate_email_setup(): | |
raise Exception("Email setup validation failed. Please check your .env file and credentials.") | |
try: | |
mail = _imap_connect() | |
# Prepare IMAP search criteria with text search | |
start_imap = _date_to_imap_format(start_date) | |
# Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) | |
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) | |
end_imap = end_dt.strftime("%d-%b-%Y") | |
# Search for emails containing the keyword in FROM field or SUBJECT or BODY | |
# We'll search multiple criteria and combine results | |
search_criteria_list = [ | |
f'FROM "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', | |
f'SUBJECT "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', | |
f'BODY "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"' | |
] | |
all_email_ids = set() | |
# Search with multiple criteria to catch emails containing the keyword | |
for search_criteria in search_criteria_list: | |
try: | |
print(f"IMAP search: {search_criteria}") | |
status, data = mail.search(None, search_criteria) | |
if status == 'OK' and data[0]: | |
email_ids = data[0].split() | |
all_email_ids.update(email_ids) | |
print(f"Found {len(email_ids)} emails with this criteria") | |
except Exception as e: | |
print(f"Search criteria failed: {search_criteria}, error: {e}") | |
continue | |
print(f"Total unique emails found: {len(all_email_ids)}") | |
scraped_emails = [] | |
# Process each email | |
for i, email_id in enumerate(all_email_ids): | |
try: | |
print(f"Processing email {i+1}/{len(all_email_ids)}") | |
# Fetch email | |
status, msg_data = mail.fetch(email_id, "(RFC822)") | |
if status != 'OK': | |
continue | |
# Parse email | |
msg = message_from_bytes(msg_data[0][1]) | |
# Extract information | |
subject = msg.get("Subject", "No Subject") | |
from_header = msg.get("From", "Unknown Sender") | |
content = _email_to_clean_text(msg) | |
# Check if the keyword is actually present (case-insensitive) | |
keyword_lower = keyword.lower() | |
if not any(keyword_lower in text.lower() for text in [subject, from_header, content]): | |
continue | |
# Parse date | |
date_header = msg.get("Date", "") | |
if date_header: | |
try: | |
dt_obj = parsedate_to_datetime(date_header) | |
# Convert to IST | |
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) | |
email_date = ist_dt.strftime("%d-%b-%Y") | |
email_time = ist_dt.strftime("%H:%M:%S") | |
except: | |
email_date = datetime.today().strftime("%d-%b-%Y") | |
email_time = "00:00:00" | |
else: | |
email_date = datetime.today().strftime("%d-%b-%Y") | |
email_time = "00:00:00" | |
# Double-check date range | |
if not _is_date_in_range(email_date, start_date, end_date): | |
continue | |
# Get message ID for deduplication | |
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") | |
scraped_emails.append({ | |
"date": email_date, | |
"time": email_time, | |
"subject": subject, | |
"from": from_header, | |
"content": content[:2000], # Limit content length | |
"message_id": message_id | |
}) | |
except Exception as e: | |
print(f"Error processing email {email_id}: {e}") | |
continue | |
mail.logout() | |
# Sort by date (newest first) | |
scraped_emails.sort(key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True) | |
print(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") | |
return scraped_emails | |
except Exception as e: | |
print(f"Email text search failed: {e}") | |
raise | |
# Test the scraper | |
if __name__ == "__main__": | |
# Test scraping | |
try: | |
emails = scrape_emails_from_sender( | |
"[email protected]", | |
"01-Jun-2025", | |
"07-Jun-2025" | |
) | |
print(f"\nFound {len(emails)} emails:") | |
for email in emails[:3]: # Show first 3 | |
print(f"- {email['date']} {email['time']}: {email['subject']}") | |
except Exception as e: | |
print(f"Test failed: {e}") |