#!/usr/bin/env python3 """ Gmail API-based Email Scraper with OAuth Authentication """ import base64 import re from datetime import datetime, timedelta from typing import List, Dict, Optional from email.mime.text import MIMEText import googleapiclient.errors from oauth_manager import oauth_manager from logger import logger class GmailAPIScraper: """Gmail API-based email scraper using OAuth authentication""" def __init__(self): """Initialize the Gmail API scraper""" self.oauth_manager = oauth_manager def _parse_date_string(self, date_str: str) -> datetime: """Parse date string in DD-MMM-YYYY format to datetime object""" try: return datetime.strptime(date_str, "%d-%b-%Y") except ValueError: raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") def _format_date_for_query(self, date_obj: datetime) -> str: """Format datetime object for Gmail API query""" return date_obj.strftime("%Y/%m/%d") def _decode_message_part(self, part: Dict) -> str: """Decode message part content""" data = part.get('body', {}).get('data', '') if data: # Decode base64url data += '=' * (4 - len(data) % 4) # Add padding if needed decoded_bytes = base64.urlsafe_b64decode(data) try: return decoded_bytes.decode('utf-8') except UnicodeDecodeError: return decoded_bytes.decode('utf-8', errors='ignore') return '' def _extract_email_content(self, message: Dict) -> str: """Extract readable content from Gmail API message""" content = "" if 'payload' not in message: return content payload = message['payload'] # Handle multipart messages if 'parts' in payload: for part in payload['parts']: mime_type = part.get('mimeType', '') if mime_type == 'text/plain': content += self._decode_message_part(part) elif mime_type == 'text/html': html_content = self._decode_message_part(part) # Simple HTML tag removal clean_text = re.sub(r'<[^>]+>', '', html_content) content += clean_text elif mime_type.startswith('multipart/'): # Handle nested multipart if 'parts' in part: for nested_part in part['parts']: nested_mime = nested_part.get('mimeType', '') if nested_mime == 'text/plain': content += self._decode_message_part(nested_part) else: # Handle single part messages mime_type = payload.get('mimeType', '') if mime_type in ['text/plain', 'text/html']: raw_content = self._decode_message_part(payload) if mime_type == 'text/html': # Simple HTML tag removal content = re.sub(r'<[^>]+>', '', raw_content) else: content = raw_content return content.strip() def _get_header_value(self, headers: List[Dict], name: str) -> str: """Get header value by name""" for header in headers: if header.get('name', '').lower() == name.lower(): return header.get('value', '') return '' def _parse_email_message(self, message: Dict) -> Dict: """Parse Gmail API message into structured format""" headers = message.get('payload', {}).get('headers', []) # Extract headers subject = self._get_header_value(headers, 'Subject') or 'No Subject' from_header = self._get_header_value(headers, 'From') or 'Unknown Sender' date_header = self._get_header_value(headers, 'Date') message_id = self._get_header_value(headers, 'Message-ID') or message.get('id', '') # Parse date email_date = datetime.now().strftime("%d-%b-%Y") email_time = "00:00:00" if date_header: try: # Parse RFC 2822 date format from email.utils import parsedate_to_datetime dt_obj = parsedate_to_datetime(date_header) # Convert to IST (Indian Standard Time) from zoneinfo import ZoneInfo ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) email_date = ist_dt.strftime("%d-%b-%Y") email_time = ist_dt.strftime("%H:%M:%S") except Exception as e: logger.warning(f"Failed to parse date {date_header}: {e}") # Extract content content = self._extract_email_content(message) return { "date": email_date, "time": email_time, "subject": subject, "from": from_header, "content": content[:2000], # Limit content length "message_id": message_id, "gmail_id": message.get('id', '') } def search_emails(self, keyword: str, start_date: str, end_date: str) -> List[Dict]: """Search emails containing keyword within date range using Gmail API Args: keyword: Keyword to search for in emails start_date: Start date in DD-MMM-YYYY format end_date: End date in DD-MMM-YYYY format Returns: List of email dictionaries """ logger.info(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") # Get Gmail service service = self.oauth_manager.get_gmail_service() if not service: raise Exception("Not authenticated. Please authenticate first using the setup tool.") try: # Parse dates start_dt = self._parse_date_string(start_date) end_dt = self._parse_date_string(end_date) # Format dates for Gmail API query after_date = self._format_date_for_query(start_dt) before_date = self._format_date_for_query(end_dt + timedelta(days=1)) # Add 1 day for inclusive end # Build search query # Gmail API search syntax: https://developers.google.com/gmail/api/guides/filtering query_parts = [ f'after:{after_date}', f'before:{before_date}', f'({keyword})' # Search in all fields ] query = ' '.join(query_parts) logger.info(f"Gmail API query: {query}") # Search for messages results = service.users().messages().list( userId='me', q=query, maxResults=500 # Limit to 500 results ).execute() messages = results.get('messages', []) logger.info(f"Found {len(messages)} messages") if not messages: return [] # Fetch full message details scraped_emails = [] for i, msg_ref in enumerate(messages): try: logger.info(f"Processing email {i+1}/{len(messages)}") # Get full message message = service.users().messages().get( userId='me', id=msg_ref['id'], format='full' ).execute() # Parse message parsed_email = self._parse_email_message(message) # Verify date range (double-check since Gmail search might be inclusive) email_dt = self._parse_date_string(parsed_email['date']) if start_dt <= email_dt <= end_dt: # Verify keyword presence (case-insensitive) keyword_lower = keyword.lower() if any(keyword_lower in text.lower() for text in [ parsed_email['subject'], parsed_email['from'], parsed_email['content'] ]): scraped_emails.append(parsed_email) except googleapiclient.errors.HttpError as e: logger.error(f"Error fetching message {msg_ref['id']}: {e}") continue except Exception as e: logger.error(f"Error processing message {msg_ref['id']}: {e}") continue # Sort by date (newest first) scraped_emails.sort( key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True ) logger.info(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") return scraped_emails except googleapiclient.errors.HttpError as e: logger.error(f"Gmail API error: {e}") raise Exception(f"Gmail API error: {e}") except Exception as e: logger.error(f"Email search failed: {e}") raise def get_email_by_id(self, message_id: str) -> Optional[Dict]: """Get email details by message ID or Gmail ID Args: message_id: Either the Message-ID header or Gmail message ID Returns: Email dictionary or None if not found """ service = self.oauth_manager.get_gmail_service() if not service: raise Exception("Not authenticated. Please authenticate first using the setup tool.") try: # Try to get message directly by Gmail ID first try: message = service.users().messages().get( userId='me', id=message_id, format='full' ).execute() return self._parse_email_message(message) except googleapiclient.errors.HttpError: # If direct ID lookup fails, search by Message-ID header pass # Search by Message-ID header query = f'rfc822msgid:{message_id}' results = service.users().messages().list( userId='me', q=query, maxResults=1 ).execute() messages = results.get('messages', []) if not messages: return None # Get the message message = service.users().messages().get( userId='me', id=messages[0]['id'], format='full' ).execute() return self._parse_email_message(message) except Exception as e: logger.error(f"Failed to get email {message_id}: {e}") return None def is_authenticated(self) -> bool: """Check if user is authenticated""" return self.oauth_manager.is_authenticated() def get_user_email(self) -> Optional[str]: """Get authenticated user's email address""" return self.oauth_manager.get_user_email() def authenticate(self) -> bool: """Trigger interactive authentication""" return self.oauth_manager.authenticate_interactive() # Global scraper instance gmail_scraper = GmailAPIScraper()