|
|
|
""" |
|
Gmail API-based Email Scraper with OAuth Authentication |
|
""" |
|
|
|
import base64 |
|
import re |
|
from datetime import datetime, timedelta |
|
from typing import List, Dict, Optional |
|
from email.mime.text import MIMEText |
|
import googleapiclient.errors |
|
from oauth_manager import oauth_manager |
|
from logger import logger |
|
|
|
class GmailAPIScraper: |
|
"""Gmail API-based email scraper using OAuth authentication""" |
|
|
|
def __init__(self): |
|
"""Initialize the Gmail API scraper""" |
|
self.oauth_manager = oauth_manager |
|
|
|
def _parse_date_string(self, date_str: str) -> datetime: |
|
"""Parse date string in DD-MMM-YYYY format to datetime object""" |
|
try: |
|
return datetime.strptime(date_str, "%d-%b-%Y") |
|
except ValueError: |
|
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") |
|
|
|
def _format_date_for_query(self, date_obj: datetime) -> str: |
|
"""Format datetime object for Gmail API query""" |
|
return date_obj.strftime("%Y/%m/%d") |
|
|
|
def _decode_message_part(self, part: Dict) -> str: |
|
"""Decode message part content""" |
|
data = part.get('body', {}).get('data', '') |
|
if data: |
|
|
|
data += '=' * (4 - len(data) % 4) |
|
decoded_bytes = base64.urlsafe_b64decode(data) |
|
try: |
|
return decoded_bytes.decode('utf-8') |
|
except UnicodeDecodeError: |
|
return decoded_bytes.decode('utf-8', errors='ignore') |
|
return '' |
|
|
|
def _extract_email_content(self, message: Dict) -> str: |
|
"""Extract readable content from Gmail API message""" |
|
content = "" |
|
|
|
if 'payload' not in message: |
|
return content |
|
|
|
payload = message['payload'] |
|
|
|
|
|
if 'parts' in payload: |
|
for part in payload['parts']: |
|
mime_type = part.get('mimeType', '') |
|
|
|
if mime_type == 'text/plain': |
|
content += self._decode_message_part(part) |
|
elif mime_type == 'text/html': |
|
html_content = self._decode_message_part(part) |
|
|
|
clean_text = re.sub(r'<[^>]+>', '', html_content) |
|
content += clean_text |
|
elif mime_type.startswith('multipart/'): |
|
|
|
if 'parts' in part: |
|
for nested_part in part['parts']: |
|
nested_mime = nested_part.get('mimeType', '') |
|
if nested_mime == 'text/plain': |
|
content += self._decode_message_part(nested_part) |
|
else: |
|
|
|
mime_type = payload.get('mimeType', '') |
|
if mime_type in ['text/plain', 'text/html']: |
|
raw_content = self._decode_message_part(payload) |
|
if mime_type == 'text/html': |
|
|
|
content = re.sub(r'<[^>]+>', '', raw_content) |
|
else: |
|
content = raw_content |
|
|
|
return content.strip() |
|
|
|
def _get_header_value(self, headers: List[Dict], name: str) -> str: |
|
"""Get header value by name""" |
|
for header in headers: |
|
if header.get('name', '').lower() == name.lower(): |
|
return header.get('value', '') |
|
return '' |
|
|
|
def _parse_email_message(self, message: Dict) -> Dict: |
|
"""Parse Gmail API message into structured format""" |
|
headers = message.get('payload', {}).get('headers', []) |
|
|
|
|
|
subject = self._get_header_value(headers, 'Subject') or 'No Subject' |
|
from_header = self._get_header_value(headers, 'From') or 'Unknown Sender' |
|
date_header = self._get_header_value(headers, 'Date') |
|
message_id = self._get_header_value(headers, 'Message-ID') or message.get('id', '') |
|
|
|
|
|
email_date = datetime.now().strftime("%d-%b-%Y") |
|
email_time = "00:00:00" |
|
|
|
if date_header: |
|
try: |
|
|
|
from email.utils import parsedate_to_datetime |
|
dt_obj = parsedate_to_datetime(date_header) |
|
|
|
from zoneinfo import ZoneInfo |
|
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) |
|
email_date = ist_dt.strftime("%d-%b-%Y") |
|
email_time = ist_dt.strftime("%H:%M:%S") |
|
except Exception as e: |
|
logger.warning(f"Failed to parse date {date_header}: {e}") |
|
|
|
|
|
content = self._extract_email_content(message) |
|
|
|
return { |
|
"date": email_date, |
|
"time": email_time, |
|
"subject": subject, |
|
"from": from_header, |
|
"content": content[:2000], |
|
"message_id": message_id, |
|
"gmail_id": message.get('id', '') |
|
} |
|
|
|
def search_emails(self, keyword: str, start_date: str, end_date: str) -> List[Dict]: |
|
"""Search emails containing keyword within date range using Gmail API |
|
|
|
Args: |
|
keyword: Keyword to search for in emails |
|
start_date: Start date in DD-MMM-YYYY format |
|
end_date: End date in DD-MMM-YYYY format |
|
|
|
Returns: |
|
List of email dictionaries |
|
""" |
|
logger.info(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") |
|
|
|
|
|
service = self.oauth_manager.get_gmail_service() |
|
if not service: |
|
raise Exception("Not authenticated. Please authenticate first using the setup tool.") |
|
|
|
try: |
|
|
|
start_dt = self._parse_date_string(start_date) |
|
end_dt = self._parse_date_string(end_date) |
|
|
|
|
|
after_date = self._format_date_for_query(start_dt) |
|
before_date = self._format_date_for_query(end_dt + timedelta(days=1)) |
|
|
|
|
|
|
|
query_parts = [ |
|
f'after:{after_date}', |
|
f'before:{before_date}', |
|
f'({keyword})' |
|
] |
|
query = ' '.join(query_parts) |
|
|
|
logger.info(f"Gmail API query: {query}") |
|
|
|
|
|
results = service.users().messages().list( |
|
userId='me', |
|
q=query, |
|
maxResults=500 |
|
).execute() |
|
|
|
messages = results.get('messages', []) |
|
logger.info(f"Found {len(messages)} messages") |
|
|
|
if not messages: |
|
return [] |
|
|
|
|
|
scraped_emails = [] |
|
|
|
for i, msg_ref in enumerate(messages): |
|
try: |
|
logger.info(f"Processing email {i+1}/{len(messages)}") |
|
|
|
|
|
message = service.users().messages().get( |
|
userId='me', |
|
id=msg_ref['id'], |
|
format='full' |
|
).execute() |
|
|
|
|
|
parsed_email = self._parse_email_message(message) |
|
|
|
|
|
email_dt = self._parse_date_string(parsed_email['date']) |
|
if start_dt <= email_dt <= end_dt: |
|
|
|
keyword_lower = keyword.lower() |
|
if any(keyword_lower in text.lower() for text in [ |
|
parsed_email['subject'], |
|
parsed_email['from'], |
|
parsed_email['content'] |
|
]): |
|
scraped_emails.append(parsed_email) |
|
|
|
except googleapiclient.errors.HttpError as e: |
|
logger.error(f"Error fetching message {msg_ref['id']}: {e}") |
|
continue |
|
except Exception as e: |
|
logger.error(f"Error processing message {msg_ref['id']}: {e}") |
|
continue |
|
|
|
|
|
scraped_emails.sort( |
|
key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), |
|
reverse=True |
|
) |
|
|
|
logger.info(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") |
|
return scraped_emails |
|
|
|
except googleapiclient.errors.HttpError as e: |
|
logger.error(f"Gmail API error: {e}") |
|
raise Exception(f"Gmail API error: {e}") |
|
except Exception as e: |
|
logger.error(f"Email search failed: {e}") |
|
raise |
|
|
|
def get_email_by_id(self, message_id: str) -> Optional[Dict]: |
|
"""Get email details by message ID or Gmail ID |
|
|
|
Args: |
|
message_id: Either the Message-ID header or Gmail message ID |
|
|
|
Returns: |
|
Email dictionary or None if not found |
|
""" |
|
service = self.oauth_manager.get_gmail_service() |
|
if not service: |
|
raise Exception("Not authenticated. Please authenticate first using the setup tool.") |
|
|
|
try: |
|
|
|
try: |
|
message = service.users().messages().get( |
|
userId='me', |
|
id=message_id, |
|
format='full' |
|
).execute() |
|
return self._parse_email_message(message) |
|
except googleapiclient.errors.HttpError: |
|
|
|
pass |
|
|
|
|
|
query = f'rfc822msgid:{message_id}' |
|
results = service.users().messages().list( |
|
userId='me', |
|
q=query, |
|
maxResults=1 |
|
).execute() |
|
|
|
messages = results.get('messages', []) |
|
if not messages: |
|
return None |
|
|
|
|
|
message = service.users().messages().get( |
|
userId='me', |
|
id=messages[0]['id'], |
|
format='full' |
|
).execute() |
|
|
|
return self._parse_email_message(message) |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get email {message_id}: {e}") |
|
return None |
|
|
|
def is_authenticated(self) -> bool: |
|
"""Check if user is authenticated""" |
|
return self.oauth_manager.is_authenticated() |
|
|
|
def get_user_email(self) -> Optional[str]: |
|
"""Get authenticated user's email address""" |
|
return self.oauth_manager.get_user_email() |
|
|
|
def authenticate(self) -> bool: |
|
"""Trigger interactive authentication""" |
|
return self.oauth_manager.authenticate_interactive() |
|
|
|
|
|
gmail_scraper = GmailAPIScraper() |