MailQuery / server /email_scraper.py
devangshrivastava
first commit
9b40609
raw
history blame
9.23 kB
#!/usr/bin/env python3
"""
Enhanced Email Scraper with Intelligent Caching
"""
import os
import imaplib
import json
from email import message_from_bytes
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
from dotenv import load_dotenv
from zoneinfo import ZoneInfo
from email.utils import parsedate_to_datetime
from typing import List, Dict
load_dotenv()
# Email credentials
APP_PASSWORD = os.getenv("APP_PASSWORD")
EMAIL_ID = os.getenv("EMAIL_ID")
EMAIL_DB_FILE = "email_db.json"
def _imap_connect():
"""Connect to Gmail IMAP server"""
try:
mail = imaplib.IMAP4_SSL("imap.gmail.com")
mail.login(EMAIL_ID, APP_PASSWORD)
mail.select('"[Gmail]/All Mail"')
return mail
except Exception as e:
print(f"IMAP connection failed: {e}")
raise
def _email_to_clean_text(msg):
"""Extract clean text from email message"""
# Try HTML first
html_content = None
text_content = None
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/html":
try:
html_content = part.get_payload(decode=True).decode(errors="ignore")
except:
continue
elif content_type == "text/plain":
try:
text_content = part.get_payload(decode=True).decode(errors="ignore")
except:
continue
else:
# Non-multipart message
content_type = msg.get_content_type()
try:
content = msg.get_payload(decode=True).decode(errors="ignore")
if content_type == "text/html":
html_content = content
else:
text_content = content
except:
pass
# Clean HTML content
if html_content:
soup = BeautifulSoup(html_content, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text(separator=' ', strip=True)
elif text_content:
return text_content.strip()
else:
return ""
def _load_email_db() -> Dict:
"""Load email database from file"""
if not os.path.exists(EMAIL_DB_FILE):
return {}
try:
with open(EMAIL_DB_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database")
return {}
def _save_email_db(db: Dict):
"""Save email database to file"""
try:
with open(EMAIL_DB_FILE, "w") as f:
json.dump(db, f, indent=2)
except IOError as e:
print(f"Error saving database: {e}")
raise
def _date_to_imap_format(date_str: str) -> str:
"""Convert DD-MMM-YYYY to IMAP date format"""
try:
dt = datetime.strptime(date_str, "%d-%b-%Y")
return dt.strftime("%d-%b-%Y")
except ValueError:
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY")
def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool:
"""Check if email date is within the specified range"""
try:
email_dt = datetime.strptime(email_date, "%d-%b-%Y")
start_dt = datetime.strptime(start_date, "%d-%b-%Y")
end_dt = datetime.strptime(end_date, "%d-%b-%Y")
return start_dt <= email_dt <= end_dt
except ValueError:
return False
def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]:
"""
Scrape emails from specific sender within date range
Uses intelligent caching to avoid re-scraping
"""
print(f"Scraping emails from {sender_email} between {start_date} and {end_date}")
# Load existing database
db = _load_email_db()
sender_email = sender_email.lower().strip()
# Check if we have cached emails for this sender
if sender_email in db:
cached_emails = db[sender_email].get("emails", [])
# Filter cached emails by date range
filtered_emails = [
email for email in cached_emails
if _is_date_in_range(email["date"], start_date, end_date)
]
# Check if we need to scrape more recent emails
last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020")
today = datetime.today().strftime("%d-%b-%Y")
if last_scraped == today and filtered_emails:
print(f"Using cached emails (last scraped: {last_scraped})")
return filtered_emails
# Need to scrape emails
try:
mail = _imap_connect()
# Prepare IMAP search criteria
start_imap = _date_to_imap_format(start_date)
# Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive)
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1)
end_imap = end_dt.strftime("%d-%b-%Y")
search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"'
print(f"IMAP search: {search_criteria}")
# Search for emails
status, data = mail.search(None, search_criteria)
if status != 'OK':
raise Exception(f"IMAP search failed: {status}")
email_ids = data[0].split()
print(f"Found {len(email_ids)} emails")
scraped_emails = []
# Process each email
for i, email_id in enumerate(email_ids):
try:
print(f"Processing email {i+1}/{len(email_ids)}")
# Fetch email
status, msg_data = mail.fetch(email_id, "(RFC822)")
if status != 'OK':
continue
# Parse email
msg = message_from_bytes(msg_data[0][1])
# Extract information
subject = msg.get("Subject", "No Subject")
content = _email_to_clean_text(msg)
# Parse date
date_header = msg.get("Date", "")
if date_header:
try:
dt_obj = parsedate_to_datetime(date_header)
# Convert to IST
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata"))
email_date = ist_dt.strftime("%d-%b-%Y")
email_time = ist_dt.strftime("%H:%M:%S")
except:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
else:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
# Get message ID for deduplication
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}")
scraped_emails.append({
"date": email_date,
"time": email_time,
"subject": subject,
"content": content[:2000], # Limit content length
"message_id": message_id
})
except Exception as e:
print(f"Error processing email {email_id}: {e}")
continue
mail.logout()
# Update database
if sender_email not in db:
db[sender_email] = {"emails": [], "last_scraped": ""}
# Merge with existing emails (avoid duplicates)
existing_emails = db[sender_email].get("emails", [])
existing_ids = {email.get("message_id") for email in existing_emails}
new_emails = [
email for email in scraped_emails
if email["message_id"] not in existing_ids
]
# Update database
db[sender_email]["emails"] = existing_emails + new_emails
db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y")
# Save database
_save_email_db(db)
# Return filtered results
all_emails = db[sender_email]["emails"]
filtered_emails = [
email for email in all_emails
if _is_date_in_range(email["date"], start_date, end_date)
]
print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range")
return filtered_emails
except Exception as e:
print(f"Email scraping failed: {e}")
raise
# Test the scraper
if __name__ == "__main__":
# Test scraping
try:
emails = scrape_emails_from_sender(
"[email protected]",
"01-Jun-2025",
"07-Jun-2025"
)
print(f"\nFound {len(emails)} emails:")
for email in emails[:3]: # Show first 3
print(f"- {email['date']} {email['time']}: {email['subject']}")
except Exception as e:
print(f"Test failed: {e}")