Da-123's picture
clean
5c85daa
raw
history blame
11.3 kB
#!/usr/bin/env python3
"""
Enhanced Email Scraper with Intelligent Caching
"""
import os
import imaplib
import json
from email import message_from_bytes
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
from dotenv import load_dotenv
from zoneinfo import ZoneInfo
from email.utils import parsedate_to_datetime
from typing import List, Dict
from logger import logger
load_dotenv()
# Email credentials
APP_PASSWORD = os.getenv("APP_PASSWORD")
EMAIL_ID = os.getenv("EMAIL_ID")
print("EMAIL_ID: ", EMAIL_ID)
EMAIL_DB_FILE = "email_db.json"
def validate_email_setup():
"""Validate email setup and credentials"""
print("=== Email Setup Validation ===")
issues = []
if not os.getenv("OPENAI_API_KEY"):
issues.append("OPENAI_API_KEY not set (needed for query processing)")
if issues:
print("❌ Issues found:")
for issue in issues:
print(f" - {issue}")
return False
else:
print("βœ… All credentials look good!")
return True
def _imap_connect():
"""Connect to Gmail IMAP server"""
print("=== IMAP Connection Debug ===")
# Check if environment variables are loaded
print(f"EMAIL_ID loaded: {'βœ… Yes' if EMAIL_ID else '❌ No (None/Empty)'}")
print(f"APP_PASSWORD loaded: {'βœ… Yes' if APP_PASSWORD else '❌ No (None/Empty)'}")
if EMAIL_ID:
print(f"Email ID: {EMAIL_ID[:5]}...@{EMAIL_ID.split('@')[1] if '@' in EMAIL_ID else 'INVALID'}")
if not EMAIL_ID or not APP_PASSWORD:
error_msg = "Missing credentials in environment variables!"
print(f"❌ {error_msg}")
raise Exception(error_msg)
try:
print("πŸ”„ Attempting IMAP SSL connection to imap.gmail.com:993...")
mail = imaplib.IMAP4_SSL("imap.gmail.com")
print("βœ… SSL connection established")
print("πŸ”„ Attempting login...")
result = mail.login(EMAIL_ID, APP_PASSWORD)
print(f"βœ… Login successful: {result}")
print("πŸ”„ Selecting mailbox: [Gmail]/All Mail...")
result = mail.select('"[Gmail]/All Mail"')
print(f"βœ… Mailbox selected: {result}")
print("=== IMAP Connection Successful ===")
return mail
except imaplib.IMAP4.error as e:
print(f"❌ IMAP Error: {e}")
print("πŸ’‘ Possible causes:")
print(" - App Password is incorrect or expired")
print(" - 2FA not enabled on Gmail account")
print(" - IMAP access not enabled in Gmail settings")
print(" - Gmail account locked or requires security verification")
raise
except Exception as e:
print(f"❌ Connection Error: {e}")
print("πŸ’‘ Possible causes:")
print(" - Network connectivity issues")
print(" - Gmail IMAP server temporarily unavailable")
print(" - Firewall blocking IMAP port 993")
raise
def _email_to_clean_text(msg):
"""Extract clean text from email message"""
# Try HTML first
html_content = None
text_content = None
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/html":
try:
html_content = part.get_payload(decode=True).decode(errors="ignore")
except:
continue
elif content_type == "text/plain":
try:
text_content = part.get_payload(decode=True).decode(errors="ignore")
except:
continue
else:
# Non-multipart message
content_type = msg.get_content_type()
try:
content = msg.get_payload(decode=True).decode(errors="ignore")
if content_type == "text/html":
html_content = content
else:
text_content = content
except:
pass
# Clean HTML content
if html_content:
soup = BeautifulSoup(html_content, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text(separator=' ', strip=True)
elif text_content:
return text_content.strip()
else:
return ""
def _load_email_db() -> Dict:
"""Load email database from file"""
if not os.path.exists(EMAIL_DB_FILE):
return {}
try:
with open(EMAIL_DB_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database")
return {}
def _save_email_db(db: Dict):
"""Save email database to file"""
try:
with open(EMAIL_DB_FILE, "w") as f:
json.dump(db, f, indent=2)
except IOError as e:
print(f"Error saving database: {e}")
raise
def _date_to_imap_format(date_str: str) -> str:
"""Convert DD-MMM-YYYY to IMAP date format"""
try:
dt = datetime.strptime(date_str, "%d-%b-%Y")
return dt.strftime("%d-%b-%Y")
except ValueError:
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY")
def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool:
"""Check if email date is within the specified range"""
try:
email_dt = datetime.strptime(email_date, "%d-%b-%Y")
start_dt = datetime.strptime(start_date, "%d-%b-%Y")
end_dt = datetime.strptime(end_date, "%d-%b-%Y")
return start_dt <= email_dt <= end_dt
except ValueError:
return False
def scrape_emails_by_text_search(keyword: str, start_date: str, end_date: str) -> List[Dict]:
"""
Scrape emails containing a specific keyword (like company name) within date range.
Uses IMAP text search to find emails from senders containing the keyword.
"""
print(f"Searching emails containing '{keyword}' between {start_date} and {end_date}")
# Validate setup first
if not validate_email_setup():
raise Exception("Email setup validation failed. Please check your .env file and credentials.")
try:
mail = _imap_connect()
# Prepare IMAP search criteria with text search
start_imap = _date_to_imap_format(start_date)
# Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive)
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1)
end_imap = end_dt.strftime("%d-%b-%Y")
# Search for emails containing the keyword in FROM field or SUBJECT or BODY
# We'll search multiple criteria and combine results
search_criteria_list = [
f'FROM "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"',
f'SUBJECT "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"',
f'BODY "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"'
]
all_email_ids = set()
# Search with multiple criteria to catch emails containing the keyword
for search_criteria in search_criteria_list:
try:
print(f"IMAP search: {search_criteria}")
status, data = mail.search(None, search_criteria)
if status == 'OK' and data[0]:
email_ids = data[0].split()
all_email_ids.update(email_ids)
print(f"Found {len(email_ids)} emails with this criteria")
except Exception as e:
print(f"Search criteria failed: {search_criteria}, error: {e}")
continue
print(f"Total unique emails found: {len(all_email_ids)}")
scraped_emails = []
# Process each email
for i, email_id in enumerate(all_email_ids):
try:
print(f"Processing email {i+1}/{len(all_email_ids)}")
# Fetch email
status, msg_data = mail.fetch(email_id, "(RFC822)")
if status != 'OK':
continue
# Parse email
msg = message_from_bytes(msg_data[0][1])
# Extract information
subject = msg.get("Subject", "No Subject")
from_header = msg.get("From", "Unknown Sender")
content = _email_to_clean_text(msg)
# Check if the keyword is actually present (case-insensitive)
keyword_lower = keyword.lower()
if not any(keyword_lower in text.lower() for text in [subject, from_header, content]):
continue
# Parse date
date_header = msg.get("Date", "")
if date_header:
try:
dt_obj = parsedate_to_datetime(date_header)
# Convert to IST
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata"))
email_date = ist_dt.strftime("%d-%b-%Y")
email_time = ist_dt.strftime("%H:%M:%S")
except:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
else:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
# Double-check date range
if not _is_date_in_range(email_date, start_date, end_date):
continue
# Get message ID for deduplication
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}")
scraped_emails.append({
"date": email_date,
"time": email_time,
"subject": subject,
"from": from_header,
"content": content[:2000], # Limit content length
"message_id": message_id
})
except Exception as e:
print(f"Error processing email {email_id}: {e}")
continue
mail.logout()
# Sort by date (newest first)
scraped_emails.sort(key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True)
print(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'")
return scraped_emails
except Exception as e:
print(f"Email text search failed: {e}")
raise
# Test the scraper
if __name__ == "__main__":
# Test scraping
try:
emails = scrape_emails_by_text_search(
"[email protected]",
"01-Jun-2025",
"07-Jun-2025"
)
print(f"\nFound {len(emails)} emails:")
for email in emails[:3]: # Show first 3
print(f"- {email['date']} {email['time']}: {email['subject']}")
except Exception as e:
print(f"Test failed: {e}")