|
|
|
""" |
|
Enhanced Email Scraper with Intelligent Caching |
|
""" |
|
|
|
import os |
|
import imaplib |
|
import json |
|
from email import message_from_bytes |
|
from bs4 import BeautifulSoup |
|
from datetime import datetime, timedelta |
|
from dotenv import load_dotenv |
|
from zoneinfo import ZoneInfo |
|
from email.utils import parsedate_to_datetime |
|
from typing import List, Dict |
|
|
|
load_dotenv() |
|
|
|
|
|
APP_PASSWORD = os.getenv("APP_PASSWORD") |
|
EMAIL_ID = os.getenv("EMAIL_ID") |
|
EMAIL_DB_FILE = "email_db.json" |
|
|
|
def _imap_connect(): |
|
"""Connect to Gmail IMAP server""" |
|
try: |
|
mail = imaplib.IMAP4_SSL("imap.gmail.com") |
|
mail.login(EMAIL_ID, APP_PASSWORD) |
|
mail.select('"[Gmail]/All Mail"') |
|
return mail |
|
except Exception as e: |
|
print(f"IMAP connection failed: {e}") |
|
raise |
|
|
|
def _email_to_clean_text(msg): |
|
"""Extract clean text from email message""" |
|
|
|
html_content = None |
|
text_content = None |
|
|
|
if msg.is_multipart(): |
|
for part in msg.walk(): |
|
content_type = part.get_content_type() |
|
if content_type == "text/html": |
|
try: |
|
html_content = part.get_payload(decode=True).decode(errors="ignore") |
|
except: |
|
continue |
|
elif content_type == "text/plain": |
|
try: |
|
text_content = part.get_payload(decode=True).decode(errors="ignore") |
|
except: |
|
continue |
|
else: |
|
|
|
content_type = msg.get_content_type() |
|
try: |
|
content = msg.get_payload(decode=True).decode(errors="ignore") |
|
if content_type == "text/html": |
|
html_content = content |
|
else: |
|
text_content = content |
|
except: |
|
pass |
|
|
|
|
|
if html_content: |
|
soup = BeautifulSoup(html_content, "html.parser") |
|
|
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
return soup.get_text(separator=' ', strip=True) |
|
elif text_content: |
|
return text_content.strip() |
|
else: |
|
return "" |
|
|
|
def _load_email_db() -> Dict: |
|
"""Load email database from file""" |
|
if not os.path.exists(EMAIL_DB_FILE): |
|
return {} |
|
try: |
|
with open(EMAIL_DB_FILE, "r") as f: |
|
return json.load(f) |
|
except (json.JSONDecodeError, IOError): |
|
print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") |
|
return {} |
|
|
|
def _save_email_db(db: Dict): |
|
"""Save email database to file""" |
|
try: |
|
with open(EMAIL_DB_FILE, "w") as f: |
|
json.dump(db, f, indent=2) |
|
except IOError as e: |
|
print(f"Error saving database: {e}") |
|
raise |
|
|
|
def _date_to_imap_format(date_str: str) -> str: |
|
"""Convert DD-MMM-YYYY to IMAP date format""" |
|
try: |
|
dt = datetime.strptime(date_str, "%d-%b-%Y") |
|
return dt.strftime("%d-%b-%Y") |
|
except ValueError: |
|
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") |
|
|
|
def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: |
|
"""Check if email date is within the specified range""" |
|
try: |
|
email_dt = datetime.strptime(email_date, "%d-%b-%Y") |
|
start_dt = datetime.strptime(start_date, "%d-%b-%Y") |
|
end_dt = datetime.strptime(end_date, "%d-%b-%Y") |
|
return start_dt <= email_dt <= end_dt |
|
except ValueError: |
|
return False |
|
|
|
def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]: |
|
""" |
|
Scrape emails from specific sender within date range |
|
Uses intelligent caching to avoid re-scraping |
|
""" |
|
print(f"Scraping emails from {sender_email} between {start_date} and {end_date}") |
|
|
|
|
|
db = _load_email_db() |
|
sender_email = sender_email.lower().strip() |
|
|
|
|
|
if sender_email in db: |
|
cached_emails = db[sender_email].get("emails", []) |
|
|
|
|
|
filtered_emails = [ |
|
email for email in cached_emails |
|
if _is_date_in_range(email["date"], start_date, end_date) |
|
] |
|
|
|
|
|
last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020") |
|
today = datetime.today().strftime("%d-%b-%Y") |
|
|
|
if last_scraped == today and filtered_emails: |
|
print(f"Using cached emails (last scraped: {last_scraped})") |
|
return filtered_emails |
|
|
|
|
|
try: |
|
mail = _imap_connect() |
|
|
|
|
|
start_imap = _date_to_imap_format(start_date) |
|
|
|
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) |
|
end_imap = end_dt.strftime("%d-%b-%Y") |
|
|
|
search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"' |
|
print(f"IMAP search: {search_criteria}") |
|
|
|
|
|
status, data = mail.search(None, search_criteria) |
|
if status != 'OK': |
|
raise Exception(f"IMAP search failed: {status}") |
|
|
|
email_ids = data[0].split() |
|
print(f"Found {len(email_ids)} emails") |
|
|
|
scraped_emails = [] |
|
|
|
|
|
for i, email_id in enumerate(email_ids): |
|
try: |
|
print(f"Processing email {i+1}/{len(email_ids)}") |
|
|
|
|
|
status, msg_data = mail.fetch(email_id, "(RFC822)") |
|
if status != 'OK': |
|
continue |
|
|
|
|
|
msg = message_from_bytes(msg_data[0][1]) |
|
|
|
|
|
subject = msg.get("Subject", "No Subject") |
|
content = _email_to_clean_text(msg) |
|
|
|
|
|
date_header = msg.get("Date", "") |
|
if date_header: |
|
try: |
|
dt_obj = parsedate_to_datetime(date_header) |
|
|
|
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) |
|
email_date = ist_dt.strftime("%d-%b-%Y") |
|
email_time = ist_dt.strftime("%H:%M:%S") |
|
except: |
|
email_date = datetime.today().strftime("%d-%b-%Y") |
|
email_time = "00:00:00" |
|
else: |
|
email_date = datetime.today().strftime("%d-%b-%Y") |
|
email_time = "00:00:00" |
|
|
|
|
|
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") |
|
|
|
scraped_emails.append({ |
|
"date": email_date, |
|
"time": email_time, |
|
"subject": subject, |
|
"content": content[:2000], |
|
"message_id": message_id |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error processing email {email_id}: {e}") |
|
continue |
|
|
|
mail.logout() |
|
|
|
|
|
if sender_email not in db: |
|
db[sender_email] = {"emails": [], "last_scraped": ""} |
|
|
|
|
|
existing_emails = db[sender_email].get("emails", []) |
|
existing_ids = {email.get("message_id") for email in existing_emails} |
|
|
|
new_emails = [ |
|
email for email in scraped_emails |
|
if email["message_id"] not in existing_ids |
|
] |
|
|
|
|
|
db[sender_email]["emails"] = existing_emails + new_emails |
|
db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y") |
|
|
|
|
|
_save_email_db(db) |
|
|
|
|
|
all_emails = db[sender_email]["emails"] |
|
filtered_emails = [ |
|
email for email in all_emails |
|
if _is_date_in_range(email["date"], start_date, end_date) |
|
] |
|
|
|
print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range") |
|
return filtered_emails |
|
|
|
except Exception as e: |
|
print(f"Email scraping failed: {e}") |
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
try: |
|
emails = scrape_emails_from_sender( |
|
"[email protected]", |
|
"01-Jun-2025", |
|
"07-Jun-2025" |
|
) |
|
|
|
print(f"\nFound {len(emails)} emails:") |
|
for email in emails[:3]: |
|
print(f"- {email['date']} {email['time']}: {email['subject']}") |
|
|
|
except Exception as e: |
|
print(f"Test failed: {e}") |