#!/usr/bin/env python3 """ Enhanced Email Scraper with Intelligent Caching """ import os import imaplib import json from email import message_from_bytes from bs4 import BeautifulSoup from datetime import datetime, timedelta from dotenv import load_dotenv from zoneinfo import ZoneInfo from email.utils import parsedate_to_datetime from typing import List, Dict from logger import logger load_dotenv() # Email credentials APP_PASSWORD = os.getenv("APP_PASSWORD") EMAIL_ID = os.getenv("EMAIL_ID") print("EMAIL_ID: ", EMAIL_ID) EMAIL_DB_FILE = "email_db.json" def validate_email_setup(): """Validate email setup and credentials""" print("=== Email Setup Validation ===") issues = [] if not os.getenv("OPENAI_API_KEY"): issues.append("OPENAI_API_KEY not set (needed for query processing)") if issues: print("❌ Issues found:") for issue in issues: print(f" - {issue}") return False else: print("✅ All credentials look good!") return True def _imap_connect(): """Connect to Gmail IMAP server""" print("=== IMAP Connection Debug ===") # Check if environment variables are loaded print(f"EMAIL_ID loaded: {'✅ Yes' if EMAIL_ID else '❌ No (None/Empty)'}") print(f"APP_PASSWORD loaded: {'✅ Yes' if APP_PASSWORD else '❌ No (None/Empty)'}") if EMAIL_ID: print(f"Email ID: {EMAIL_ID[:5]}...@{EMAIL_ID.split('@')[1] if '@' in EMAIL_ID else 'INVALID'}") if not EMAIL_ID or not APP_PASSWORD: error_msg = "Missing credentials in environment variables!" print(f"❌ {error_msg}") raise Exception(error_msg) try: print("🔄 Attempting IMAP SSL connection to imap.gmail.com:993...") mail = imaplib.IMAP4_SSL("imap.gmail.com") print("✅ SSL connection established") print("🔄 Attempting login...") result = mail.login(EMAIL_ID, APP_PASSWORD) print(f"✅ Login successful: {result}") print("🔄 Selecting mailbox: [Gmail]/All Mail...") result = mail.select('"[Gmail]/All Mail"') print(f"✅ Mailbox selected: {result}") print("=== IMAP Connection Successful ===") return mail except imaplib.IMAP4.error as e: print(f"❌ IMAP Error: {e}") print("💡 Possible causes:") print(" - App Password is incorrect or expired") print(" - 2FA not enabled on Gmail account") print(" - IMAP access not enabled in Gmail settings") print(" - Gmail account locked or requires security verification") raise except Exception as e: print(f"❌ Connection Error: {e}") print("💡 Possible causes:") print(" - Network connectivity issues") print(" - Gmail IMAP server temporarily unavailable") print(" - Firewall blocking IMAP port 993") raise def _email_to_clean_text(msg): """Extract clean text from email message""" # Try HTML first html_content = None text_content = None if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/html": try: html_content = part.get_payload(decode=True).decode(errors="ignore") except: continue elif content_type == "text/plain": try: text_content = part.get_payload(decode=True).decode(errors="ignore") except: continue else: # Non-multipart message content_type = msg.get_content_type() try: content = msg.get_payload(decode=True).decode(errors="ignore") if content_type == "text/html": html_content = content else: text_content = content except: pass # Clean HTML content if html_content: soup = BeautifulSoup(html_content, "html.parser") # Remove script and style elements for script in soup(["script", "style"]): script.decompose() return soup.get_text(separator=' ', strip=True) elif text_content: return text_content.strip() else: return "" def _load_email_db() -> Dict: """Load email database from file""" if not os.path.exists(EMAIL_DB_FILE): return {} try: with open(EMAIL_DB_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") return {} def _save_email_db(db: Dict): """Save email database to file""" try: with open(EMAIL_DB_FILE, "w") as f: json.dump(db, f, indent=2) except IOError as e: print(f"Error saving database: {e}") raise def _date_to_imap_format(date_str: str) -> str: """Convert DD-MMM-YYYY to IMAP date format""" try: dt = datetime.strptime(date_str, "%d-%b-%Y") return dt.strftime("%d-%b-%Y") except ValueError: raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: """Check if email date is within the specified range""" try: email_dt = datetime.strptime(email_date, "%d-%b-%Y") start_dt = datetime.strptime(start_date, "%d-%b-%Y") end_dt = datetime.strptime(end_date, "%d-%b-%Y") return start_dt <= email_dt <= end_dt except ValueError: return False def scrape_emails_by_text_search(keyword: str, start_date: str, end_date: str) -> List[Dict]: """ Scrape emails containing a specific keyword (like company name) within date range. Uses IMAP text search to find emails from senders containing the keyword. """ print(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") # Validate setup first if not validate_email_setup(): raise Exception("Email setup validation failed. Please check your .env file and credentials.") try: mail = _imap_connect() # Prepare IMAP search criteria with text search start_imap = _date_to_imap_format(start_date) # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) end_imap = end_dt.strftime("%d-%b-%Y") # Search for emails containing the keyword in FROM field or SUBJECT or BODY # We'll search multiple criteria and combine results search_criteria_list = [ f'FROM "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', f'SUBJECT "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', f'BODY "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"' ] all_email_ids = set() # Search with multiple criteria to catch emails containing the keyword for search_criteria in search_criteria_list: try: print(f"IMAP search: {search_criteria}") status, data = mail.search(None, search_criteria) if status == 'OK' and data[0]: email_ids = data[0].split() all_email_ids.update(email_ids) print(f"Found {len(email_ids)} emails with this criteria") except Exception as e: print(f"Search criteria failed: {search_criteria}, error: {e}") continue print(f"Total unique emails found: {len(all_email_ids)}") scraped_emails = [] # Process each email for i, email_id in enumerate(all_email_ids): try: print(f"Processing email {i+1}/{len(all_email_ids)}") # Fetch email status, msg_data = mail.fetch(email_id, "(RFC822)") if status != 'OK': continue # Parse email msg = message_from_bytes(msg_data[0][1]) # Extract information subject = msg.get("Subject", "No Subject") from_header = msg.get("From", "Unknown Sender") content = _email_to_clean_text(msg) # Check if the keyword is actually present (case-insensitive) keyword_lower = keyword.lower() if not any(keyword_lower in text.lower() for text in [subject, from_header, content]): continue # Parse date date_header = msg.get("Date", "") if date_header: try: dt_obj = parsedate_to_datetime(date_header) # Convert to IST ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) email_date = ist_dt.strftime("%d-%b-%Y") email_time = ist_dt.strftime("%H:%M:%S") except: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" else: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" # Double-check date range if not _is_date_in_range(email_date, start_date, end_date): continue # Get message ID for deduplication message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") scraped_emails.append({ "date": email_date, "time": email_time, "subject": subject, "from": from_header, "content": content[:2000], # Limit content length "message_id": message_id }) except Exception as e: print(f"Error processing email {email_id}: {e}") continue mail.logout() # Sort by date (newest first) scraped_emails.sort(key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True) print(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") return scraped_emails except Exception as e: print(f"Email text search failed: {e}") raise def scrape_emails_by_text_search_with_credentials(email_id: str, app_password: str, keyword: str, start_date: str, end_date: str) -> List[Dict]: """ Scrape emails containing a specific keyword (like company name) within date range. Uses provided credentials instead of environment variables. Args: email_id: Gmail address app_password: Gmail app password keyword: Keyword to search for start_date: Start date in DD-MMM-YYYY format end_date: End date in DD-MMM-YYYY format """ print(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") if not email_id or not app_password: raise Exception("Email ID and App Password are required") try: # Connect using provided credentials print("=== IMAP Connection Debug ===") print(f"Email ID: {email_id[:5]}...@{email_id.split('@')[1] if '@' in email_id else 'INVALID'}") print("App password: [PROVIDED]") print("🔄 Attempting IMAP SSL connection to imap.gmail.com:993...") mail = imaplib.IMAP4_SSL("imap.gmail.com") print("✅ SSL connection established") print("🔄 Attempting login...") result = mail.login(email_id, app_password) print(f"✅ Login successful: {result}") print("🔄 Selecting mailbox: [Gmail]/All Mail...") result = mail.select('"[Gmail]/All Mail"') print(f"✅ Mailbox selected: {result}") # Prepare IMAP search criteria with text search start_imap = _date_to_imap_format(start_date) # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) end_imap = end_dt.strftime("%d-%b-%Y") # Search for emails containing the keyword in FROM field or SUBJECT or BODY # We'll search multiple criteria and combine results search_criteria_list = [ f'FROM "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', f'SUBJECT "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', f'BODY "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"' ] all_email_ids = set() # Search with multiple criteria to catch emails containing the keyword for search_criteria in search_criteria_list: try: print(f"IMAP search: {search_criteria}") status, data = mail.search(None, search_criteria) if status == 'OK' and data[0]: email_ids = data[0].split() all_email_ids.update(email_ids) print(f"Found {len(email_ids)} emails with this criteria") except Exception as e: print(f"Search criteria failed: {search_criteria}, error: {e}") continue print(f"Total unique emails found: {len(all_email_ids)}") scraped_emails = [] # Process each email for i, email_id in enumerate(all_email_ids): try: print(f"Processing email {i+1}/{len(all_email_ids)}") # Fetch email status, msg_data = mail.fetch(email_id, "(RFC822)") if status != 'OK': continue # Parse email msg = message_from_bytes(msg_data[0][1]) # Extract information subject = msg.get("Subject", "No Subject") from_header = msg.get("From", "Unknown Sender") content = _email_to_clean_text(msg) # Check if the keyword is actually present (case-insensitive) keyword_lower = keyword.lower() if not any(keyword_lower in text.lower() for text in [subject, from_header, content]): continue # Parse date date_header = msg.get("Date", "") if date_header: try: dt_obj = parsedate_to_datetime(date_header) # Convert to IST ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) email_date = ist_dt.strftime("%d-%b-%Y") email_time = ist_dt.strftime("%H:%M:%S") except: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" else: email_date = datetime.today().strftime("%d-%b-%Y") email_time = "00:00:00" # Double-check date range if not _is_date_in_range(email_date, start_date, end_date): continue # Get message ID for deduplication message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") scraped_emails.append({ "date": email_date, "time": email_time, "subject": subject, "from": from_header, "content": content[:2000], # Limit content length "message_id": message_id }) except Exception as e: print(f"Error processing email {email_id}: {e}") continue mail.logout() # Sort by date (newest first) scraped_emails.sort(key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True) print(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") return scraped_emails except Exception as e: print(f"Email text search failed: {e}") raise # Test the scraper if __name__ == "__main__": # Test scraping try: emails = scrape_emails_by_text_search( "noreply@example.com", "01-Jun-2025", "07-Jun-2025" ) print(f"\nFound {len(emails)} emails:") for email in emails[:3]: # Show first 3 print(f"- {email['date']} {email['time']}: {email['subject']}") except Exception as e: print(f"Test failed: {e}")