MailQuery / agentic_implementation /gmail_api_scraper.py
Da-123's picture
authChange (#10)
c5c5634 verified
#!/usr/bin/env python3
"""
Gmail API-based Email Scraper with OAuth Authentication
"""
import base64
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from email.mime.text import MIMEText
import googleapiclient.errors
from oauth_manager import oauth_manager
from logger import logger
class GmailAPIScraper:
"""Gmail API-based email scraper using OAuth authentication"""
def __init__(self):
"""Initialize the Gmail API scraper"""
self.oauth_manager = oauth_manager
def _parse_date_string(self, date_str: str) -> datetime:
"""Parse date string in DD-MMM-YYYY format to datetime object"""
try:
return datetime.strptime(date_str, "%d-%b-%Y")
except ValueError:
raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY")
def _format_date_for_query(self, date_obj: datetime) -> str:
"""Format datetime object for Gmail API query"""
return date_obj.strftime("%Y/%m/%d")
def _decode_message_part(self, part: Dict) -> str:
"""Decode message part content"""
data = part.get('body', {}).get('data', '')
if data:
# Decode base64url
data += '=' * (4 - len(data) % 4) # Add padding if needed
decoded_bytes = base64.urlsafe_b64decode(data)
try:
return decoded_bytes.decode('utf-8')
except UnicodeDecodeError:
return decoded_bytes.decode('utf-8', errors='ignore')
return ''
def _extract_email_content(self, message: Dict) -> str:
"""Extract readable content from Gmail API message"""
content = ""
if 'payload' not in message:
return content
payload = message['payload']
# Handle multipart messages
if 'parts' in payload:
for part in payload['parts']:
mime_type = part.get('mimeType', '')
if mime_type == 'text/plain':
content += self._decode_message_part(part)
elif mime_type == 'text/html':
html_content = self._decode_message_part(part)
# Simple HTML tag removal
clean_text = re.sub(r'<[^>]+>', '', html_content)
content += clean_text
elif mime_type.startswith('multipart/'):
# Handle nested multipart
if 'parts' in part:
for nested_part in part['parts']:
nested_mime = nested_part.get('mimeType', '')
if nested_mime == 'text/plain':
content += self._decode_message_part(nested_part)
else:
# Handle single part messages
mime_type = payload.get('mimeType', '')
if mime_type in ['text/plain', 'text/html']:
raw_content = self._decode_message_part(payload)
if mime_type == 'text/html':
# Simple HTML tag removal
content = re.sub(r'<[^>]+>', '', raw_content)
else:
content = raw_content
return content.strip()
def _get_header_value(self, headers: List[Dict], name: str) -> str:
"""Get header value by name"""
for header in headers:
if header.get('name', '').lower() == name.lower():
return header.get('value', '')
return ''
def _parse_email_message(self, message: Dict) -> Dict:
"""Parse Gmail API message into structured format"""
headers = message.get('payload', {}).get('headers', [])
# Extract headers
subject = self._get_header_value(headers, 'Subject') or 'No Subject'
from_header = self._get_header_value(headers, 'From') or 'Unknown Sender'
date_header = self._get_header_value(headers, 'Date')
message_id = self._get_header_value(headers, 'Message-ID') or message.get('id', '')
# Parse date
email_date = datetime.now().strftime("%d-%b-%Y")
email_time = "00:00:00"
if date_header:
try:
# Parse RFC 2822 date format
from email.utils import parsedate_to_datetime
dt_obj = parsedate_to_datetime(date_header)
# Convert to IST (Indian Standard Time)
from zoneinfo import ZoneInfo
ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata"))
email_date = ist_dt.strftime("%d-%b-%Y")
email_time = ist_dt.strftime("%H:%M:%S")
except Exception as e:
logger.warning(f"Failed to parse date {date_header}: {e}")
# Extract content
content = self._extract_email_content(message)
return {
"date": email_date,
"time": email_time,
"subject": subject,
"from": from_header,
"content": content[:2000], # Limit content length
"message_id": message_id,
"gmail_id": message.get('id', '')
}
def search_emails(self, keyword: str, start_date: str, end_date: str) -> List[Dict]:
"""Search emails containing keyword within date range using Gmail API
Args:
keyword: Keyword to search for in emails
start_date: Start date in DD-MMM-YYYY format
end_date: End date in DD-MMM-YYYY format
Returns:
List of email dictionaries
"""
logger.info(f"Searching emails containing '{keyword}' between {start_date} and {end_date}")
# Get Gmail service
service = self.oauth_manager.get_gmail_service()
if not service:
raise Exception("Not authenticated. Please authenticate first using the setup tool.")
try:
# Parse dates
start_dt = self._parse_date_string(start_date)
end_dt = self._parse_date_string(end_date)
# Format dates for Gmail API query
after_date = self._format_date_for_query(start_dt)
before_date = self._format_date_for_query(end_dt + timedelta(days=1)) # Add 1 day for inclusive end
# Build search query
# Gmail API search syntax: https://developers.google.com/gmail/api/guides/filtering
query_parts = [
f'after:{after_date}',
f'before:{before_date}',
f'({keyword})' # Search in all fields
]
query = ' '.join(query_parts)
logger.info(f"Gmail API query: {query}")
# Search for messages
results = service.users().messages().list(
userId='me',
q=query,
maxResults=500 # Limit to 500 results
).execute()
messages = results.get('messages', [])
logger.info(f"Found {len(messages)} messages")
if not messages:
return []
# Fetch full message details
scraped_emails = []
for i, msg_ref in enumerate(messages):
try:
logger.info(f"Processing email {i+1}/{len(messages)}")
# Get full message
message = service.users().messages().get(
userId='me',
id=msg_ref['id'],
format='full'
).execute()
# Parse message
parsed_email = self._parse_email_message(message)
# Verify date range (double-check since Gmail search might be inclusive)
email_dt = self._parse_date_string(parsed_email['date'])
if start_dt <= email_dt <= end_dt:
# Verify keyword presence (case-insensitive)
keyword_lower = keyword.lower()
if any(keyword_lower in text.lower() for text in [
parsed_email['subject'],
parsed_email['from'],
parsed_email['content']
]):
scraped_emails.append(parsed_email)
except googleapiclient.errors.HttpError as e:
logger.error(f"Error fetching message {msg_ref['id']}: {e}")
continue
except Exception as e:
logger.error(f"Error processing message {msg_ref['id']}: {e}")
continue
# Sort by date (newest first)
scraped_emails.sort(
key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"),
reverse=True
)
logger.info(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'")
return scraped_emails
except googleapiclient.errors.HttpError as e:
logger.error(f"Gmail API error: {e}")
raise Exception(f"Gmail API error: {e}")
except Exception as e:
logger.error(f"Email search failed: {e}")
raise
def get_email_by_id(self, message_id: str) -> Optional[Dict]:
"""Get email details by message ID or Gmail ID
Args:
message_id: Either the Message-ID header or Gmail message ID
Returns:
Email dictionary or None if not found
"""
service = self.oauth_manager.get_gmail_service()
if not service:
raise Exception("Not authenticated. Please authenticate first using the setup tool.")
try:
# Try to get message directly by Gmail ID first
try:
message = service.users().messages().get(
userId='me',
id=message_id,
format='full'
).execute()
return self._parse_email_message(message)
except googleapiclient.errors.HttpError:
# If direct ID lookup fails, search by Message-ID header
pass
# Search by Message-ID header
query = f'rfc822msgid:{message_id}'
results = service.users().messages().list(
userId='me',
q=query,
maxResults=1
).execute()
messages = results.get('messages', [])
if not messages:
return None
# Get the message
message = service.users().messages().get(
userId='me',
id=messages[0]['id'],
format='full'
).execute()
return self._parse_email_message(message)
except Exception as e:
logger.error(f"Failed to get email {message_id}: {e}")
return None
def is_authenticated(self) -> bool:
"""Check if user is authenticated"""
return self.oauth_manager.is_authenticated()
def get_user_email(self) -> Optional[str]:
"""Get authenticated user's email address"""
return self.oauth_manager.get_user_email()
def authenticate(self) -> bool:
"""Trigger interactive authentication"""
return self.oauth_manager.authenticate_interactive()
# Global scraper instance
gmail_scraper = GmailAPIScraper()