#!/usr/bin/env python3 """ Enhanced Email Scraper with Intelligent Caching """ import os import imaplib import json from email import message_from_bytes from bs4 import BeautifulSoup from datetime import datetime, timedelta from dotenv import load_dotenv from zoneinfo import ZoneInfo from email.utils import parsedate_to_datetime from typing import List, Dict load_dotenv() # Email credentials APP_PASSWORD = os.getenv("APP_PASSWORD") EMAIL_ID = os.getenv("EMAIL_ID") EMAIL_DB_FILE = "email_db.json" def _imap_connect(): """Connect to Gmail IMAP server""" try: mail = imaplib.IMAP4_SSL("imap.gmail.com") mail.login(EMAIL_ID, APP_PASSWORD) mail.select('"[Gmail]/All Mail"') return mail except Exception as e: print(f"IMAP connection failed: {e}") raise def _email_to_clean_text(msg): """Extract clean text from email message""" # Try HTML first html_content = None text_content = None if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/html": try: html_content = part.get_payload(decode=True).decode(errors="ignore") except: continue elif content_type == "text/plain": try: text_content = part.get_payload(decode=True).decode(errors="ignore") except: continue else: # Non-multipart message content_type = msg.get_content_type() try: content = msg.get_payload(decode=True).decode(errors="ignore") if content_type == "text/html": html_content = content else: text_content = content except: pass # Clean HTML content if html_content: soup = BeautifulSoup(html_content, "html.parser") # Remove script and style elements for script in soup(["script", "style"]): script.decompose() return soup.get_text(separator=' ', strip=True) elif text_content: return text_content.strip() else: return "" def _load_email_db() -> Dict: """Load email database from file""" if not os.path.exists(EMAIL_DB_FILE): return {} try: with open(EMAIL_DB_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") return {} def _save_email_db(db: Dict): """Save email database to file""" try: with open(EMAIL_DB_FILE, "w") as f: json.dump(db, f, indent=2) except IOError as e: print(f"Error saving database: {e}") raise def _date_to_imap_format(date_str: str) -> str: """Convert DD-MMM-YYYY to IMAP date format""" try: dt = datetime.strptime(date_str, "%d-%b-%Y") return dt.strftime("%d-%b-%Y") except ValueError: raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: """Check if email date is within the specified range""" try: email_dt = datetime.strptime(email_date, "%d-%b-%Y") start_dt = datetime.strptime(start_date, "%d-%b-%Y") end_dt = datetime.strptime(end_date, "%d-%b-%Y") return start_dt <= email_dt <= end_dt except ValueError: return False def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]: """ Scrape emails from specific sender within date range Uses intelligent caching to avoid re-scraping """ print(f"Scraping emails from {sender_email} between {start_date} and {end_date}") # Load existing database db = _load_email_db() sender_email = sender_email.lower().strip() # Check if we have cached emails for this sender if sender_email in db: cached_emails = db[sender_email].get("emails", []) # Filter cached emails by date range filtered_emails = [ email for email in cached_emails if _is_date_in_range(email["date"], start_date, end_date) ] # Check if we need to scrape more recent emails last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020") today = datetime.today().strftime("%d-%b-%Y") if last_scraped == today and filtered_emails: print(f"Using cached emails (last scraped: {last_scraped})") return filtered_emails # Need to scrape emails try: mail = _imap_connect() # Prepare IMAP search criteria start_imap = _date_to_imap_format(start_date) # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) end_imap = end_dt.strftime("%d-%b-%Y") search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"' print(f"IMAP search: {search_criteria}") # Search for emails status, data = mail.search(None, search_criteria) if status != 'OK': raise Exception(f"IMAP search failed: {status}") email_ids = data[0].split() print(f"Found {len(email_ids)} emails") scraped_emails = [] # Process each email for i, email_id in enumerate(email_ids): try: print(f"Processing email {i+1}/{len(email_ids)}") # Fetch email status, msg_data = mail.fetch(email_id, "(RFC822)") if status != 'OK': continue # Parse email msg = message_from_bytes(msg_data[0][1]) # Extract information subject = msg.get("Subject", "No Subject") content = _email_to_clean_text(msg) # Parse date date_header = msg.get("Date", "") if date_header: try: dt_obj = parsedate_to_datetime(date_header) # Convert to IST ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) email_date = ist_dt.strftime("%d-%b-%Y") email_time = ist_dt.strftime("%H:%M:%S") except: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" else: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" # Get message ID for deduplication message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") scraped_emails.append({ "date": email_date, "time": email_time, "subject": subject, "content": content[:2000], # Limit content length "message_id": message_id }) except Exception as e: print(f"Error processing email {email_id}: {e}") continue mail.logout() # Update database if sender_email not in db: db[sender_email] = {"emails": [], "last_scraped": ""} # Merge with existing emails (avoid duplicates) existing_emails = db[sender_email].get("emails", []) existing_ids = {email.get("message_id") for email in existing_emails} new_emails = [ email for email in scraped_emails if email["message_id"] not in existing_ids ] # Update database db[sender_email]["emails"] = existing_emails + new_emails db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y") # Save database _save_email_db(db) # Return filtered results all_emails = db[sender_email]["emails"] filtered_emails = [ email for email in all_emails if _is_date_in_range(email["date"], start_date, end_date) ] print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range") return filtered_emails except Exception as e: print(f"Email scraping failed: {e}") raise # Test the scraper if __name__ == "__main__": # Test scraping try: emails = scrape_emails_from_sender( "noreply@example.com", "01-Jun-2025", "07-Jun-2025" ) print(f"\nFound {len(emails)} emails:") for email in emails[:3]: # Show first 3 print(f"- {email['date']} {email['time']}: {email['subject']}") except Exception as e: print(f"Test failed: {e}")