|
import imaplib |
|
import email |
|
import smtplib |
|
from email.mime.text import MIMEText |
|
from email.mime.multipart import MIMEMultipart |
|
from typing import Dict, List, Any, Optional, Tuple |
|
from datetime import datetime |
|
import re |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, IntegrationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class EmailIntegration: |
|
"""Email integration for converting emails to tasks and sending notifications""" |
|
|
|
def __init__(self, email_address: Optional[str] = None, password: Optional[str] = None, |
|
imap_server: str = "imap.gmail.com", imap_port: int = 993, |
|
smtp_server: str = "smtp.gmail.com", smtp_port: int = 587): |
|
"""Initialize Email integration |
|
|
|
Args: |
|
email_address: Email address (optional) |
|
password: Email password or app password (optional) |
|
imap_server: IMAP server address (default: imap.gmail.com) |
|
imap_port: IMAP server port (default: 993) |
|
smtp_server: SMTP server address (default: smtp.gmail.com) |
|
smtp_port: SMTP server port (default: 587) |
|
""" |
|
self.email_address = email_address |
|
self.password = password |
|
self.imap_server = imap_server |
|
self.imap_port = imap_port |
|
self.smtp_server = smtp_server |
|
self.smtp_port = smtp_port |
|
|
|
@handle_exceptions |
|
def set_credentials(self, email_address: str, password: str) -> None: |
|
"""Set email credentials |
|
|
|
Args: |
|
email_address: Email address |
|
password: Email password or app password |
|
""" |
|
self.email_address = email_address |
|
self.password = password |
|
|
|
@handle_exceptions |
|
def set_imap_server(self, server: str, port: int = 993) -> None: |
|
"""Set IMAP server settings |
|
|
|
Args: |
|
server: IMAP server address |
|
port: IMAP server port (default: 993) |
|
""" |
|
self.imap_server = server |
|
self.imap_port = port |
|
|
|
@handle_exceptions |
|
def set_smtp_server(self, server: str, port: int = 587) -> None: |
|
"""Set SMTP server settings |
|
|
|
Args: |
|
server: SMTP server address |
|
port: SMTP server port (default: 587) |
|
""" |
|
self.smtp_server = server |
|
self.smtp_port = port |
|
|
|
@handle_exceptions |
|
def test_connection(self) -> bool: |
|
"""Test email connection |
|
|
|
Returns: |
|
True if connection is successful, False otherwise |
|
""" |
|
if not self.email_address or not self.password: |
|
logger.error("Email credentials not set") |
|
return False |
|
|
|
try: |
|
|
|
imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) |
|
imap.login(self.email_address, self.password) |
|
imap.logout() |
|
|
|
|
|
smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) |
|
smtp.ehlo() |
|
smtp.starttls() |
|
smtp.login(self.email_address, self.password) |
|
smtp.quit() |
|
|
|
return True |
|
except Exception as e: |
|
logger.error(f"Email connection test failed: {str(e)}") |
|
return False |
|
|
|
@handle_exceptions |
|
def fetch_emails(self, folder: str = "INBOX", limit: int = 10, |
|
unread_only: bool = True, search_criteria: str = "ALL") -> List[Dict[str, Any]]: |
|
"""Fetch emails from the specified folder |
|
|
|
Args: |
|
folder: Email folder to fetch from (default: INBOX) |
|
limit: Maximum number of emails to fetch (default: 10) |
|
unread_only: Only fetch unread emails (default: True) |
|
search_criteria: IMAP search criteria (default: ALL) |
|
|
|
Returns: |
|
List of email data dictionaries |
|
""" |
|
if not self.email_address or not self.password: |
|
raise IntegrationError("Email credentials not set") |
|
|
|
try: |
|
|
|
imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) |
|
imap.login(self.email_address, self.password) |
|
|
|
|
|
imap.select(folder) |
|
|
|
|
|
if unread_only: |
|
search_criteria = "(UNSEEN)" |
|
|
|
status, data = imap.search(None, search_criteria) |
|
if status != "OK": |
|
raise IntegrationError(f"Failed to search emails: {status}") |
|
|
|
|
|
email_ids = data[0].split() |
|
if not email_ids: |
|
return [] |
|
|
|
|
|
if limit > 0: |
|
email_ids = email_ids[-limit:] |
|
|
|
emails = [] |
|
for email_id in reversed(email_ids): |
|
status, data = imap.fetch(email_id, "(RFC822)") |
|
if status != "OK": |
|
logger.error(f"Failed to fetch email {email_id}: {status}") |
|
continue |
|
|
|
raw_email = data[0][1] |
|
email_message = email.message_from_bytes(raw_email) |
|
|
|
|
|
email_data = self._parse_email(email_message, email_id) |
|
emails.append(email_data) |
|
|
|
imap.logout() |
|
return emails |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to fetch emails: {str(e)}") |
|
raise IntegrationError(f"Failed to fetch emails: {str(e)}") |
|
|
|
def _parse_email(self, email_message: email.message.Message, email_id: bytes) -> Dict[str, Any]: |
|
"""Parse email message into a dictionary |
|
|
|
Args: |
|
email_message: Email message object |
|
email_id: Email ID |
|
|
|
Returns: |
|
Email data dictionary |
|
""" |
|
|
|
subject = self._decode_header(email_message.get("Subject", "")) |
|
from_addr = self._decode_header(email_message.get("From", "")) |
|
to_addr = self._decode_header(email_message.get("To", "")) |
|
date_str = email_message.get("Date", "") |
|
|
|
|
|
date = None |
|
if date_str: |
|
try: |
|
date = email.utils.parsedate_to_datetime(date_str) |
|
except: |
|
pass |
|
|
|
|
|
body = "" |
|
if email_message.is_multipart(): |
|
for part in email_message.walk(): |
|
content_type = part.get_content_type() |
|
content_disposition = str(part.get("Content-Disposition", "")) |
|
|
|
|
|
if "attachment" in content_disposition: |
|
continue |
|
|
|
|
|
if content_type == "text/plain": |
|
try: |
|
body_part = part.get_payload(decode=True).decode() |
|
body += body_part |
|
except: |
|
pass |
|
else: |
|
|
|
try: |
|
body = email_message.get_payload(decode=True).decode() |
|
except: |
|
pass |
|
|
|
|
|
email_data = { |
|
"id": email_id.decode(), |
|
"subject": subject, |
|
"from": from_addr, |
|
"to": to_addr, |
|
"date": date.isoformat() if date else None, |
|
"body": body, |
|
"is_read": "\\Seen" in email_message.get_flags() if hasattr(email_message, "get_flags") else False |
|
} |
|
|
|
return email_data |
|
|
|
def _decode_header(self, header: str) -> str: |
|
"""Decode email header |
|
|
|
Args: |
|
header: Email header |
|
|
|
Returns: |
|
Decoded header |
|
""" |
|
if not header: |
|
return "" |
|
|
|
try: |
|
decoded_header = email.header.decode_header(header) |
|
decoded_parts = [] |
|
|
|
for part, encoding in decoded_header: |
|
if isinstance(part, bytes): |
|
if encoding: |
|
try: |
|
decoded_parts.append(part.decode(encoding)) |
|
except: |
|
decoded_parts.append(part.decode("utf-8", errors="replace")) |
|
else: |
|
decoded_parts.append(part.decode("utf-8", errors="replace")) |
|
else: |
|
decoded_parts.append(str(part)) |
|
|
|
return "".join(decoded_parts) |
|
except: |
|
return header |
|
|
|
@handle_exceptions |
|
def mark_as_read(self, email_id: str, folder: str = "INBOX") -> bool: |
|
"""Mark an email as read |
|
|
|
Args: |
|
email_id: Email ID |
|
folder: Email folder (default: INBOX) |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
if not self.email_address or not self.password: |
|
raise IntegrationError("Email credentials not set") |
|
|
|
try: |
|
|
|
imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) |
|
imap.login(self.email_address, self.password) |
|
|
|
|
|
imap.select(folder) |
|
|
|
|
|
imap.store(email_id.encode(), "+FLAGS", "\\Seen") |
|
|
|
imap.logout() |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to mark email as read: {str(e)}") |
|
return False |
|
|
|
@handle_exceptions |
|
def send_email(self, to_addr: str, subject: str, body: str, |
|
html_body: Optional[str] = None) -> bool: |
|
"""Send an email |
|
|
|
Args: |
|
to_addr: Recipient email address |
|
subject: Email subject |
|
body: Email body (plain text) |
|
html_body: Email body (HTML, optional) |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
if not self.email_address or not self.password: |
|
raise IntegrationError("Email credentials not set") |
|
|
|
try: |
|
|
|
msg = MIMEMultipart("alternative") |
|
msg["Subject"] = subject |
|
msg["From"] = self.email_address |
|
msg["To"] = to_addr |
|
|
|
|
|
msg.attach(MIMEText(body, "plain")) |
|
|
|
|
|
if html_body: |
|
msg.attach(MIMEText(html_body, "html")) |
|
|
|
|
|
smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) |
|
smtp.ehlo() |
|
smtp.starttls() |
|
smtp.login(self.email_address, self.password) |
|
smtp.send_message(msg) |
|
smtp.quit() |
|
|
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to send email: {str(e)}") |
|
return False |
|
|
|
@handle_exceptions |
|
def convert_email_to_task(self, email_data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Convert an email to a task |
|
|
|
Args: |
|
email_data: Email data dictionary |
|
|
|
Returns: |
|
Task data dictionary |
|
""" |
|
|
|
subject = email_data.get("subject", "") |
|
body = email_data.get("body", "") |
|
sender = email_data.get("from", "") |
|
date = email_data.get("date") |
|
|
|
|
|
title = subject |
|
|
|
|
|
priority = "medium" |
|
priority_patterns = { |
|
"low": r"\[low\]|\(low\)|low priority", |
|
"high": r"\[high\]|\(high\)|high priority|urgent|important" |
|
} |
|
|
|
for p, pattern in priority_patterns.items(): |
|
if re.search(pattern, subject, re.IGNORECASE): |
|
priority = p |
|
|
|
title = re.sub(pattern, "", title, flags=re.IGNORECASE).strip() |
|
|
|
|
|
due_date = None |
|
due_date_patterns = [ |
|
r"due\s*(?:date|by)?\s*:?\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", |
|
r"deadline\s*:?\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})" |
|
] |
|
|
|
for pattern in due_date_patterns: |
|
match = re.search(pattern, body, re.IGNORECASE) |
|
if match: |
|
due_date_str = match.group(1) |
|
try: |
|
|
|
|
|
due_date = datetime.strptime(due_date_str, "%d/%m/%Y").isoformat() |
|
except: |
|
try: |
|
due_date = datetime.strptime(due_date_str, "%m/%d/%Y").isoformat() |
|
except: |
|
pass |
|
|
|
|
|
task = { |
|
"title": title, |
|
"description": f"From: {sender}\n\n{body[:500]}{'...' if len(body) > 500 else ''}", |
|
"status": "todo", |
|
"priority": priority, |
|
"due_date": due_date, |
|
"created_at": datetime.now().isoformat(), |
|
"source": "email", |
|
"source_id": email_data.get("id"), |
|
"metadata": { |
|
"email_from": sender, |
|
"email_date": date, |
|
"email_subject": subject |
|
} |
|
} |
|
|
|
return task |
|
|
|
@handle_exceptions |
|
def process_emails_to_tasks(self, folder: str = "INBOX", limit: int = 10, |
|
unread_only: bool = True, mark_as_read: bool = True) -> List[Dict[str, Any]]: |
|
"""Process emails and convert them to tasks |
|
|
|
Args: |
|
folder: Email folder to fetch from (default: INBOX) |
|
limit: Maximum number of emails to fetch (default: 10) |
|
unread_only: Only fetch unread emails (default: True) |
|
mark_as_read: Mark processed emails as read (default: True) |
|
|
|
Returns: |
|
List of task data dictionaries |
|
""" |
|
|
|
emails = self.fetch_emails(folder, limit, unread_only) |
|
|
|
tasks = [] |
|
for email_data in emails: |
|
|
|
task = self.convert_email_to_task(email_data) |
|
tasks.append(task) |
|
|
|
|
|
if mark_as_read: |
|
self.mark_as_read(email_data["id"], folder) |
|
|
|
return tasks |