#!/usr/bin/env python3 """ Enhanced Email Scraper with Intelligent Caching """ import os import imaplib import json from email import message_from_bytes from bs4 import BeautifulSoup from datetime import datetime, timedelta from dotenv import load_dotenv from zoneinfo import ZoneInfo from email.utils import parsedate_to_datetime from typing import List, Dict load_dotenv() # Email credentials APP_PASSWORD = os.getenv("APP_PASSWORD") EMAIL_ID = os.getenv("EMAIL_ID") print("EMAIL_ID: ", EMAIL_ID) EMAIL_DB_FILE = "email_db.json" def validate_email_setup(): """Validate email setup and credentials""" print("=== Email Setup Validation ===") # Check .env file existence # env_file_exists = os.path.exists('.env') # print(f".env file exists: {'✅ Yes' if env_file_exists else '❌ No'}") # if not env_file_exists: # print("❌ No .env file found! Create one with:") # print(" EMAIL_ID=your_email@gmail.com") # print(" APP_PASSWORD=your_16_char_app_password") # print(" OPENAI_API_KEY=your_openai_key") # return False # Check environment variables issues = [] # if not EMAIL_ID: # issues.append("EMAIL_ID not set or empty") # elif '@' not in EMAIL_ID: # issues.append("EMAIL_ID doesn't look like an email address") # elif not EMAIL_ID.endswith('@gmail.com'): # issues.append("EMAIL_ID should be a Gmail address (@gmail.com)") # if not APP_PASSWORD: # issues.append("APP_PASSWORD not set or empty") # elif len(APP_PASSWORD) != 16: # issues.append(f"APP_PASSWORD should be 16 characters, got {len(APP_PASSWORD)}") # elif ' ' in APP_PASSWORD: # issues.append("APP_PASSWORD should not contain spaces (remove spaces from app password)") if not os.getenv("OPENAI_API_KEY"): issues.append("OPENAI_API_KEY not set (needed for query processing)") if issues: print("❌ Issues found:") for issue in issues: print(f" - {issue}") return False else: print("✅ All credentials look good!") return True def _imap_connect(): """Connect to Gmail IMAP server""" print("=== IMAP Connection Debug ===") # Check if environment variables are loaded print(f"EMAIL_ID loaded: {'✅ Yes' if EMAIL_ID else '❌ No (None/Empty)'}") print(f"APP_PASSWORD loaded: {'✅ Yes' if APP_PASSWORD else '❌ No (None/Empty)'}") if EMAIL_ID: print(f"Email ID: {EMAIL_ID[:5]}...@{EMAIL_ID.split('@')[1] if '@' in EMAIL_ID else 'INVALID'}") # if APP_PASSWORD: # print(f"App Password length: {len(APP_PASSWORD)} characters") # print(f"App Password format: {'✅ Looks correct (16 chars)' if len(APP_PASSWORD) == 16 else f'❌ Expected 16 chars, got {len(APP_PASSWORD)}'}") if not EMAIL_ID or not APP_PASSWORD: error_msg = "Missing credentials in environment variables!" print(f"❌ {error_msg}") raise Exception(error_msg) try: print("🔄 Attempting IMAP SSL connection to imap.gmail.com:993...") mail = imaplib.IMAP4_SSL("imap.gmail.com") print("✅ SSL connection established") print("🔄 Attempting login...") result = mail.login(EMAIL_ID, APP_PASSWORD) print(f"✅ Login successful: {result}") print("🔄 Selecting mailbox: [Gmail]/All Mail...") result = mail.select('"[Gmail]/All Mail"') print(f"✅ Mailbox selected: {result}") print("=== IMAP Connection Successful ===") return mail except imaplib.IMAP4.error as e: print(f"❌ IMAP Error: {e}") print("💡 Possible causes:") print(" - App Password is incorrect or expired") print(" - 2FA not enabled on Gmail account") print(" - IMAP access not enabled in Gmail settings") print(" - Gmail account locked or requires security verification") raise except Exception as e: print(f"❌ Connection Error: {e}") print("💡 Possible causes:") print(" - Network connectivity issues") print(" - Gmail IMAP server temporarily unavailable") print(" - Firewall blocking IMAP port 993") raise def _email_to_clean_text(msg): """Extract clean text from email message""" # Try HTML first html_content = None text_content = None if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/html": try: html_content = part.get_payload(decode=True).decode(errors="ignore") except: continue elif content_type == "text/plain": try: text_content = part.get_payload(decode=True).decode(errors="ignore") except: continue else: # Non-multipart message content_type = msg.get_content_type() try: content = msg.get_payload(decode=True).decode(errors="ignore") if content_type == "text/html": html_content = content else: text_content = content except: pass # Clean HTML content if html_content: soup = BeautifulSoup(html_content, "html.parser") # Remove script and style elements for script in soup(["script", "style"]): script.decompose() return soup.get_text(separator=' ', strip=True) elif text_content: return text_content.strip() else: return "" def _load_email_db() -> Dict: """Load email database from file""" if not os.path.exists(EMAIL_DB_FILE): return {} try: with open(EMAIL_DB_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") return {} def _save_email_db(db: Dict): """Save email database to file""" try: with open(EMAIL_DB_FILE, "w") as f: json.dump(db, f, indent=2) except IOError as e: print(f"Error saving database: {e}") raise def _date_to_imap_format(date_str: str) -> str: """Convert DD-MMM-YYYY to IMAP date format""" try: dt = datetime.strptime(date_str, "%d-%b-%Y") return dt.strftime("%d-%b-%Y") except ValueError: raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: """Check if email date is within the specified range""" try: email_dt = datetime.strptime(email_date, "%d-%b-%Y") start_dt = datetime.strptime(start_date, "%d-%b-%Y") end_dt = datetime.strptime(end_date, "%d-%b-%Y") return start_dt <= email_dt <= end_dt except ValueError: return False def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]: """ Scrape emails from specific sender within date range Uses intelligent caching to avoid re-scraping """ print(f"Scraping emails from {sender_email} between {start_date} and {end_date}") # Load existing database db = _load_email_db() sender_email = sender_email.lower().strip() # Check if we have cached emails for this sender if sender_email in db: cached_emails = db[sender_email].get("emails", []) # Filter cached emails by date range filtered_emails = [ email for email in cached_emails if _is_date_in_range(email["date"], start_date, end_date) ] # Check if we need to scrape more recent emails last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020") today = datetime.today().strftime("%d-%b-%Y") if last_scraped == today and filtered_emails: print(f"Using cached emails (last scraped: {last_scraped})") return filtered_emails # Need to scrape emails try: mail = _imap_connect() # Prepare IMAP search criteria start_imap = _date_to_imap_format(start_date) # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) end_imap = end_dt.strftime("%d-%b-%Y") search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"' print(f"IMAP search: {search_criteria}") # Search for emails status, data = mail.search(None, search_criteria) if status != 'OK': raise Exception(f"IMAP search failed: {status}") email_ids = data[0].split() print(f"Found {len(email_ids)} emails") scraped_emails = [] # Process each email for i, email_id in enumerate(email_ids): try: print(f"Processing email {i+1}/{len(email_ids)}") # Fetch email status, msg_data = mail.fetch(email_id, "(RFC822)") if status != 'OK': continue # Parse email msg = message_from_bytes(msg_data[0][1]) # Extract information subject = msg.get("Subject", "No Subject") content = _email_to_clean_text(msg) # Parse date date_header = msg.get("Date", "") if date_header: try: dt_obj = parsedate_to_datetime(date_header) # Convert to IST ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) email_date = ist_dt.strftime("%d-%b-%Y") email_time = ist_dt.strftime("%H:%M:%S") except: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" else: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" # Get message ID for deduplication message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") scraped_emails.append({ "date": email_date, "time": email_time, "subject": subject, "content": content[:2000], # Limit content length "message_id": message_id }) except Exception as e: print(f"Error processing email {email_id}: {e}") continue mail.logout() # Update database if sender_email not in db: db[sender_email] = {"emails": [], "last_scraped": ""} # Merge with existing emails (avoid duplicates) existing_emails = db[sender_email].get("emails", []) existing_ids = {email.get("message_id") for email in existing_emails} new_emails = [ email for email in scraped_emails if email["message_id"] not in existing_ids ] # Update database db[sender_email]["emails"] = existing_emails + new_emails db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y") # Save database _save_email_db(db) # Return filtered results all_emails = db[sender_email]["emails"] filtered_emails = [ email for email in all_emails if _is_date_in_range(email["date"], start_date, end_date) ] print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range") return filtered_emails except Exception as e: print(f"Email scraping failed: {e}") raise def scrape_emails_by_text_search(keyword: str, start_date: str, end_date: str) -> List[Dict]: """ Scrape emails containing a specific keyword (like company name) within date range. Uses IMAP text search to find emails from senders containing the keyword. """ print(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") # Validate setup first if not validate_email_setup(): raise Exception("Email setup validation failed. Please check your .env file and credentials.") try: mail = _imap_connect() # Prepare IMAP search criteria with text search start_imap = _date_to_imap_format(start_date) # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) end_imap = end_dt.strftime("%d-%b-%Y") # Search for emails containing the keyword in FROM field or SUBJECT or BODY # We'll search multiple criteria and combine results search_criteria_list = [ f'FROM "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', f'SUBJECT "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', f'BODY "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"' ] all_email_ids = set() # Search with multiple criteria to catch emails containing the keyword for search_criteria in search_criteria_list: try: print(f"IMAP search: {search_criteria}") status, data = mail.search(None, search_criteria) if status == 'OK' and data[0]: email_ids = data[0].split() all_email_ids.update(email_ids) print(f"Found {len(email_ids)} emails with this criteria") except Exception as e: print(f"Search criteria failed: {search_criteria}, error: {e}") continue print(f"Total unique emails found: {len(all_email_ids)}") scraped_emails = [] # Process each email for i, email_id in enumerate(all_email_ids): try: print(f"Processing email {i+1}/{len(all_email_ids)}") # Fetch email status, msg_data = mail.fetch(email_id, "(RFC822)") if status != 'OK': continue # Parse email msg = message_from_bytes(msg_data[0][1]) # Extract information subject = msg.get("Subject", "No Subject") from_header = msg.get("From", "Unknown Sender") content = _email_to_clean_text(msg) # Check if the keyword is actually present (case-insensitive) keyword_lower = keyword.lower() if not any(keyword_lower in text.lower() for text in [subject, from_header, content]): continue # Parse date date_header = msg.get("Date", "") if date_header: try: dt_obj = parsedate_to_datetime(date_header) # Convert to IST ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) email_date = ist_dt.strftime("%d-%b-%Y") email_time = ist_dt.strftime("%H:%M:%S") except: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" else: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" # Double-check date range if not _is_date_in_range(email_date, start_date, end_date): continue # Get message ID for deduplication message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") scraped_emails.append({ "date": email_date, "time": email_time, "subject": subject, "from": from_header, "content": content[:2000], # Limit content length "message_id": message_id }) except Exception as e: print(f"Error processing email {email_id}: {e}") continue mail.logout() # Sort by date (newest first) scraped_emails.sort(key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True) print(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") return scraped_emails except Exception as e: print(f"Email text search failed: {e}") raise # Test the scraper if __name__ == "__main__": # Test scraping try: emails = scrape_emails_from_sender( "noreply@example.com", "01-Jun-2025", "07-Jun-2025" ) print(f"\nFound {len(emails)} emails:") for email in emails[:3]: # Show first 3 print(f"- {email['date']} {email['time']}: {email['subject']}") except Exception as e: print(f"Test failed: {e}")