devangshrivastava
logger file introduced
40ae10a
raw
history blame
17.8 kB
#!/usr/bin/env python3
"""
Enhanced Email Scraper with Intelligent Caching
"""
import os
import imaplib
import json
from email import message_from_bytes
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
from dotenv import load_dotenv
from zoneinfo import ZoneInfo
from email.utils import parsedate_to_datetime
from typing import List, Dict
load_dotenv()
# Email credentials
APP_PASSWORD = os.getenv("APP_PASSWORD")
EMAIL_ID = os.getenv("EMAIL_ID")
print("EMAIL_ID: ", EMAIL_ID)
EMAIL_DB_FILE = "email_db.json"
def validate_email_setup():
"""Validate email setup and credentials"""
print("=== Email Setup Validation ===")
# Check .env file existence
# env_file_exists = os.path.exists('.env')
# print(f".env file exists: {'βœ… Yes' if env_file_exists else '❌ No'}")
# if not env_file_exists:
# print("❌ No .env file found! Create one with:")
# print(" [email protected]")
# print(" APP_PASSWORD=your_16_char_app_password")
# print(" OPENAI_API_KEY=your_openai_key")
# return False
# Check environment variables
issues = []
# if not EMAIL_ID:
# issues.append("EMAIL_ID not set or empty")
# elif '@' not in EMAIL_ID:
# issues.append("EMAIL_ID doesn't look like an email address")
# elif not EMAIL_ID.endswith('@gmail.com'):
# issues.append("EMAIL_ID should be a Gmail address (@gmail.com)")
# if not APP_PASSWORD:
# issues.append("APP_PASSWORD not set or empty")
# elif len(APP_PASSWORD) != 16:
# issues.append(f"APP_PASSWORD should be 16 characters, got {len(APP_PASSWORD)}")
# elif ' ' in APP_PASSWORD:
# issues.append("APP_PASSWORD should not contain spaces (remove spaces from app password)")
if not os.getenv("OPENAI_API_KEY"):
issues.append("OPENAI_API_KEY not set (needed for query processing)")
if issues:
print("❌ Issues found:")
for issue in issues:
print(f" - {issue}")
return False
else:
print("βœ… All credentials look good!")
return True
def _imap_connect():
"""Connect to Gmail IMAP server"""
print("=== IMAP Connection Debug ===")
# Check if environment variables are loaded
print(f"EMAIL_ID loaded: {'βœ… Yes' if EMAIL_ID else '❌ No (None/Empty)'}")
print(f"APP_PASSWORD loaded: {'βœ… Yes' if APP_PASSWORD else '❌ No (None/Empty)'}")
if EMAIL_ID:
print(f"Email ID: {EMAIL_ID[:5]}...@{EMAIL_ID.split('@')[1] if '@' in EMAIL_ID else 'INVALID'}")
# if APP_PASSWORD:
# print(f"App Password length: {len(APP_PASSWORD)} characters")
# print(f"App Password format: {'βœ… Looks correct (16 chars)' if len(APP_PASSWORD) == 16 else f'❌ Expected 16 chars, got {len(APP_PASSWORD)}'}")
if not EMAIL_ID or not APP_PASSWORD:
error_msg = "Missing credentials in environment variables!"
print(f"❌ {error_msg}")
raise Exception(error_msg)
try:
print("πŸ”„ Attempting IMAP SSL connection to imap.gmail.com:993...")
mail = imaplib.IMAP4_SSL("imap.gmail.com")
print("βœ… SSL connection established")
print("πŸ”„ Attempting login...")
result = mail.login(EMAIL_ID, APP_PASSWORD)
print(f"βœ… Login successful: {result}")
print("πŸ”„ Selecting mailbox: [Gmail]/All Mail...")
result = mail.select('"[Gmail]/All Mail"')
print(f"βœ… Mailbox selected: {result}")
print("=== IMAP Connection Successful ===")
return mail
except imaplib.IMAP4.error as e:
print(f"❌ IMAP Error: {e}")
print("πŸ’‘ Possible causes:")
print(" - App Password is incorrect or expired")
print(" - 2FA not enabled on Gmail account")
print(" - IMAP access not enabled in Gmail settings")
print(" - Gmail account locked or requires security verification")
raise
except Exception as e:
print(f"❌ Connection Error: {e}")
print("πŸ’‘ Possible causes:")
print(" - Network connectivity issues")
print(" - Gmail IMAP server temporarily unavailable")
print(" - Firewall blocking IMAP port 993")
raise
def _email_to_clean_text(msg):
"""Extract clean text from email message"""
# Try HTML first
html_content = None
text_content = None
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/html":
try:
html_content = part.get_payload(decode=True).decode(errors="ignore")
except:
continue
elif content_type == "text/plain":
try:
text_content = part.get_payload(decode=True).decode(errors="ignore")
except:
continue
else:
# Non-multipart message
content_type = msg.get_content_type()
try:
content = msg.get_payload(decode=True).decode(errors="ignore")
if content_type == "text/html":
html_content = content
else:
text_content = content
except:
pass
# Clean HTML content
if html_content:
soup = BeautifulSoup(html_content, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text(separator=' ', strip=True)
elif text_content:
return text_content.strip()
else:
return ""
def _load_email_db() -> Dict:
"""Load email database from file"""
if not os.path.exists(EMAIL_DB_FILE):
return {}
try:
with open(EMAIL_DB_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database")
return {}
def _save_email_db(db: Dict):
"""Save email database to file"""
try:
with open(EMAIL_DB_FILE, "w") as f:
json.dump(db, f, indent=2)
except IOError as e:
print(f"Error saving database: {e}")
raise
def _date_to_imap_format(date_str: str) -> str:
"""Convert DD-MMM-YYYY to IMAP date format"""
try:
dt = datetime.strptime(date_str, "%d-%b-%Y")
return dt.strftime("%d-%b-%Y")
except ValueError:
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY")
def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool:
"""Check if email date is within the specified range"""
try:
email_dt = datetime.strptime(email_date, "%d-%b-%Y")
start_dt = datetime.strptime(start_date, "%d-%b-%Y")
end_dt = datetime.strptime(end_date, "%d-%b-%Y")
return start_dt <= email_dt <= end_dt
except ValueError:
return False
def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]:
"""
Scrape emails from specific sender within date range
Uses intelligent caching to avoid re-scraping
"""
print(f"Scraping emails from {sender_email} between {start_date} and {end_date}")
# Load existing database
db = _load_email_db()
sender_email = sender_email.lower().strip()
# Check if we have cached emails for this sender
if sender_email in db:
cached_emails = db[sender_email].get("emails", [])
# Filter cached emails by date range
filtered_emails = [
email for email in cached_emails
if _is_date_in_range(email["date"], start_date, end_date)
]
# Check if we need to scrape more recent emails
last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020")
today = datetime.today().strftime("%d-%b-%Y")
if last_scraped == today and filtered_emails:
print(f"Using cached emails (last scraped: {last_scraped})")
return filtered_emails
# Need to scrape emails
try:
mail = _imap_connect()
# Prepare IMAP search criteria
start_imap = _date_to_imap_format(start_date)
# Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive)
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1)
end_imap = end_dt.strftime("%d-%b-%Y")
search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"'
print(f"IMAP search: {search_criteria}")
# Search for emails
status, data = mail.search(None, search_criteria)
if status != 'OK':
raise Exception(f"IMAP search failed: {status}")
email_ids = data[0].split()
print(f"Found {len(email_ids)} emails")
scraped_emails = []
# Process each email
for i, email_id in enumerate(email_ids):
try:
print(f"Processing email {i+1}/{len(email_ids)}")
# Fetch email
status, msg_data = mail.fetch(email_id, "(RFC822)")
if status != 'OK':
continue
# Parse email
msg = message_from_bytes(msg_data[0][1])
# Extract information
subject = msg.get("Subject", "No Subject")
content = _email_to_clean_text(msg)
# Parse date
date_header = msg.get("Date", "")
if date_header:
try:
dt_obj = parsedate_to_datetime(date_header)
# Convert to IST
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata"))
email_date = ist_dt.strftime("%d-%b-%Y")
email_time = ist_dt.strftime("%H:%M:%S")
except:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
else:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
# Get message ID for deduplication
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}")
scraped_emails.append({
"date": email_date,
"time": email_time,
"subject": subject,
"content": content[:2000], # Limit content length
"message_id": message_id
})
except Exception as e:
print(f"Error processing email {email_id}: {e}")
continue
mail.logout()
# Update database
if sender_email not in db:
db[sender_email] = {"emails": [], "last_scraped": ""}
# Merge with existing emails (avoid duplicates)
existing_emails = db[sender_email].get("emails", [])
existing_ids = {email.get("message_id") for email in existing_emails}
new_emails = [
email for email in scraped_emails
if email["message_id"] not in existing_ids
]
# Update database
db[sender_email]["emails"] = existing_emails + new_emails
db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y")
# Save database
_save_email_db(db)
# Return filtered results
all_emails = db[sender_email]["emails"]
filtered_emails = [
email for email in all_emails
if _is_date_in_range(email["date"], start_date, end_date)
]
print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range")
return filtered_emails
except Exception as e:
print(f"Email scraping failed: {e}")
raise
def scrape_emails_by_text_search(keyword: str, start_date: str, end_date: str) -> List[Dict]:
"""
Scrape emails containing a specific keyword (like company name) within date range.
Uses IMAP text search to find emails from senders containing the keyword.
"""
print(f"Searching emails containing '{keyword}' between {start_date} and {end_date}")
# Validate setup first
if not validate_email_setup():
raise Exception("Email setup validation failed. Please check your .env file and credentials.")
try:
mail = _imap_connect()
# Prepare IMAP search criteria with text search
start_imap = _date_to_imap_format(start_date)
# Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive)
end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1)
end_imap = end_dt.strftime("%d-%b-%Y")
# Search for emails containing the keyword in FROM field or SUBJECT or BODY
# We'll search multiple criteria and combine results
search_criteria_list = [
f'FROM "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"',
f'SUBJECT "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"',
f'BODY "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"'
]
all_email_ids = set()
# Search with multiple criteria to catch emails containing the keyword
for search_criteria in search_criteria_list:
try:
print(f"IMAP search: {search_criteria}")
status, data = mail.search(None, search_criteria)
if status == 'OK' and data[0]:
email_ids = data[0].split()
all_email_ids.update(email_ids)
print(f"Found {len(email_ids)} emails with this criteria")
except Exception as e:
print(f"Search criteria failed: {search_criteria}, error: {e}")
continue
print(f"Total unique emails found: {len(all_email_ids)}")
scraped_emails = []
# Process each email
for i, email_id in enumerate(all_email_ids):
try:
print(f"Processing email {i+1}/{len(all_email_ids)}")
# Fetch email
status, msg_data = mail.fetch(email_id, "(RFC822)")
if status != 'OK':
continue
# Parse email
msg = message_from_bytes(msg_data[0][1])
# Extract information
subject = msg.get("Subject", "No Subject")
from_header = msg.get("From", "Unknown Sender")
content = _email_to_clean_text(msg)
# Check if the keyword is actually present (case-insensitive)
keyword_lower = keyword.lower()
if not any(keyword_lower in text.lower() for text in [subject, from_header, content]):
continue
# Parse date
date_header = msg.get("Date", "")
if date_header:
try:
dt_obj = parsedate_to_datetime(date_header)
# Convert to IST
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata"))
email_date = ist_dt.strftime("%d-%b-%Y")
email_time = ist_dt.strftime("%H:%M:%S")
except:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
else:
email_date = datetime.today().strftime("%d-%b-%Y")
email_time = "00:00:00"
# Double-check date range
if not _is_date_in_range(email_date, start_date, end_date):
continue
# Get message ID for deduplication
message_id = msg.get("Message-ID", f"missing-{email_id.decode()}")
scraped_emails.append({
"date": email_date,
"time": email_time,
"subject": subject,
"from": from_header,
"content": content[:2000], # Limit content length
"message_id": message_id
})
except Exception as e:
print(f"Error processing email {email_id}: {e}")
continue
mail.logout()
# Sort by date (newest first)
scraped_emails.sort(key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True)
print(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'")
return scraped_emails
except Exception as e:
print(f"Email text search failed: {e}")
raise
# Test the scraper
if __name__ == "__main__":
# Test scraping
try:
emails = scrape_emails_from_sender(
"[email protected]",
"01-Jun-2025",
"07-Jun-2025"
)
print(f"\nFound {len(emails)} emails:")
for email in emails[:3]: # Show first 3
print(f"- {email['date']} {email['time']}: {email['subject']}")
except Exception as e:
print(f"Test failed: {e}")