Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Enhanced Email Scraper with Intelligent Caching | |
""" | |
import os | |
import imaplib | |
import json | |
from email import message_from_bytes | |
from bs4 import BeautifulSoup | |
from datetime import datetime, timedelta | |
from dotenv import load_dotenv | |
from zoneinfo import ZoneInfo | |
from email.utils import parsedate_to_datetime | |
from typing import List, Dict | |
load_dotenv() | |
# Email credentials | |
APP_PASSWORD = os.getenv("APP_PASSWORD") | |
EMAIL_ID = os.getenv("EMAIL_ID") | |
EMAIL_DB_FILE = "email_db.json" | |
def _imap_connect(): | |
"""Connect to Gmail IMAP server""" | |
try: | |
mail = imaplib.IMAP4_SSL("imap.gmail.com") | |
mail.login(EMAIL_ID, APP_PASSWORD) | |
mail.select('"[Gmail]/All Mail"') | |
return mail | |
except Exception as e: | |
print(f"IMAP connection failed: {e}") | |
raise | |
def _email_to_clean_text(msg): | |
"""Extract clean text from email message""" | |
# Try HTML first | |
html_content = None | |
text_content = None | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
content_type = part.get_content_type() | |
if content_type == "text/html": | |
try: | |
html_content = part.get_payload(decode=True).decode(errors="ignore") | |
except: | |
continue | |
elif content_type == "text/plain": | |
try: | |
text_content = part.get_payload(decode=True).decode(errors="ignore") | |
except: | |
continue | |
else: | |
# Non-multipart message | |
content_type = msg.get_content_type() | |
try: | |
content = msg.get_payload(decode=True).decode(errors="ignore") | |
if content_type == "text/html": | |
html_content = content | |
else: | |
text_content = content | |
except: | |
pass | |
# Clean HTML content | |
if html_content: | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
return soup.get_text(separator=' ', strip=True) | |
elif text_content: | |
return text_content.strip() | |
else: | |
return "" | |
def _load_email_db() -> Dict: | |
"""Load email database from file""" | |
if not os.path.exists(EMAIL_DB_FILE): | |
return {} | |
try: | |
with open(EMAIL_DB_FILE, "r") as f: | |
return json.load(f) | |
except (json.JSONDecodeError, IOError): | |
print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") | |
return {} | |
def _save_email_db(db: Dict): | |
"""Save email database to file""" | |
try: | |
with open(EMAIL_DB_FILE, "w") as f: | |
json.dump(db, f, indent=2) | |
except IOError as e: | |
print(f"Error saving database: {e}") | |
raise | |
def _date_to_imap_format(date_str: str) -> str: | |
"""Convert DD-MMM-YYYY to IMAP date format""" | |
try: | |
dt = datetime.strptime(date_str, "%d-%b-%Y") | |
return dt.strftime("%d-%b-%Y") | |
except ValueError: | |
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") | |
def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: | |
"""Check if email date is within the specified range""" | |
try: | |
email_dt = datetime.strptime(email_date, "%d-%b-%Y") | |
start_dt = datetime.strptime(start_date, "%d-%b-%Y") | |
end_dt = datetime.strptime(end_date, "%d-%b-%Y") | |
return start_dt <= email_dt <= end_dt | |
except ValueError: | |
return False | |
def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]: | |
""" | |
Scrape emails from specific sender within date range | |
Uses intelligent caching to avoid re-scraping | |
""" | |
print(f"Scraping emails from {sender_email} between {start_date} and {end_date}") | |
# Load existing database | |
db = _load_email_db() | |
sender_email = sender_email.lower().strip() | |
# Check if we have cached emails for this sender | |
if sender_email in db: | |
cached_emails = db[sender_email].get("emails", []) | |
# Filter cached emails by date range | |
filtered_emails = [ | |
email for email in cached_emails | |
if _is_date_in_range(email["date"], start_date, end_date) | |
] | |
# Check if we need to scrape more recent emails | |
last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020") | |
today = datetime.today().strftime("%d-%b-%Y") | |
if last_scraped == today and filtered_emails: | |
print(f"Using cached emails (last scraped: {last_scraped})") | |
return filtered_emails | |
# Need to scrape emails | |
try: | |
mail = _imap_connect() | |
# Prepare IMAP search criteria | |
start_imap = _date_to_imap_format(start_date) | |
# Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) | |
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) | |
end_imap = end_dt.strftime("%d-%b-%Y") | |
search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"' | |
print(f"IMAP search: {search_criteria}") | |
# Search for emails | |
status, data = mail.search(None, search_criteria) | |
if status != 'OK': | |
raise Exception(f"IMAP search failed: {status}") | |
email_ids = data[0].split() | |
print(f"Found {len(email_ids)} emails") | |
scraped_emails = [] | |
# Process each email | |
for i, email_id in enumerate(email_ids): | |
try: | |
print(f"Processing email {i+1}/{len(email_ids)}") | |
# Fetch email | |
status, msg_data = mail.fetch(email_id, "(RFC822)") | |
if status != 'OK': | |
continue | |
# Parse email | |
msg = message_from_bytes(msg_data[0][1]) | |
# Extract information | |
subject = msg.get("Subject", "No Subject") | |
content = _email_to_clean_text(msg) | |
# Parse date | |
date_header = msg.get("Date", "") | |
if date_header: | |
try: | |
dt_obj = parsedate_to_datetime(date_header) | |
# Convert to IST | |
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) | |
email_date = ist_dt.strftime("%d-%b-%Y") | |
email_time = ist_dt.strftime("%H:%M:%S") | |
except: | |
email_date = datetime.today().strftime("%d-%b-%Y") | |
email_time = "00:00:00" | |
else: | |
email_date = datetime.today().strftime("%d-%b-%Y") | |
email_time = "00:00:00" | |
# Get message ID for deduplication | |
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") | |
scraped_emails.append({ | |
"date": email_date, | |
"time": email_time, | |
"subject": subject, | |
"content": content[:2000], # Limit content length | |
"message_id": message_id | |
}) | |
except Exception as e: | |
print(f"Error processing email {email_id}: {e}") | |
continue | |
mail.logout() | |
# Update database | |
if sender_email not in db: | |
db[sender_email] = {"emails": [], "last_scraped": ""} | |
# Merge with existing emails (avoid duplicates) | |
existing_emails = db[sender_email].get("emails", []) | |
existing_ids = {email.get("message_id") for email in existing_emails} | |
new_emails = [ | |
email for email in scraped_emails | |
if email["message_id"] not in existing_ids | |
] | |
# Update database | |
db[sender_email]["emails"] = existing_emails + new_emails | |
db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y") | |
# Save database | |
_save_email_db(db) | |
# Return filtered results | |
all_emails = db[sender_email]["emails"] | |
filtered_emails = [ | |
email for email in all_emails | |
if _is_date_in_range(email["date"], start_date, end_date) | |
] | |
print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range") | |
return filtered_emails | |
except Exception as e: | |
print(f"Email scraping failed: {e}") | |
raise | |
# Test the scraper | |
if __name__ == "__main__": | |
# Test scraping | |
try: | |
emails = scrape_emails_from_sender( | |
"[email protected]", | |
"01-Jun-2025", | |
"07-Jun-2025" | |
) | |
print(f"\nFound {len(emails)} emails:") | |
for email in emails[:3]: # Show first 3 | |
print(f"- {email['date']} {email['time']}: {email['subject']}") | |
except Exception as e: | |
print(f"Test failed: {e}") |