import imaplib import email import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from typing import Dict, List, Any, Optional, Tuple from datetime import datetime import re from utils.logging import setup_logger from utils.error_handling import handle_exceptions, IntegrationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class EmailIntegration: """Email integration for converting emails to tasks and sending notifications""" def __init__(self, email_address: Optional[str] = None, password: Optional[str] = None, imap_server: str = "imap.gmail.com", imap_port: int = 993, smtp_server: str = "smtp.gmail.com", smtp_port: int = 587): """Initialize Email integration Args: email_address: Email address (optional) password: Email password or app password (optional) imap_server: IMAP server address (default: imap.gmail.com) imap_port: IMAP server port (default: 993) smtp_server: SMTP server address (default: smtp.gmail.com) smtp_port: SMTP server port (default: 587) """ self.email_address = email_address self.password = password self.imap_server = imap_server self.imap_port = imap_port self.smtp_server = smtp_server self.smtp_port = smtp_port @handle_exceptions def set_credentials(self, email_address: str, password: str) -> None: """Set email credentials Args: email_address: Email address password: Email password or app password """ self.email_address = email_address self.password = password @handle_exceptions def set_imap_server(self, server: str, port: int = 993) -> None: """Set IMAP server settings Args: server: IMAP server address port: IMAP server port (default: 993) """ self.imap_server = server self.imap_port = port @handle_exceptions def set_smtp_server(self, server: str, port: int = 587) -> None: """Set SMTP server settings Args: server: SMTP server address port: SMTP server port (default: 587) """ self.smtp_server = server self.smtp_port = port @handle_exceptions def test_connection(self) -> bool: """Test email connection Returns: True if connection is successful, False otherwise """ if not self.email_address or not self.password: logger.error("Email credentials not set") return False try: # Test IMAP connection imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) imap.login(self.email_address, self.password) imap.logout() # Test SMTP connection smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) smtp.ehlo() smtp.starttls() smtp.login(self.email_address, self.password) smtp.quit() return True except Exception as e: logger.error(f"Email connection test failed: {str(e)}") return False @handle_exceptions def fetch_emails(self, folder: str = "INBOX", limit: int = 10, unread_only: bool = True, search_criteria: str = "ALL") -> List[Dict[str, Any]]: """Fetch emails from the specified folder Args: folder: Email folder to fetch from (default: INBOX) limit: Maximum number of emails to fetch (default: 10) unread_only: Only fetch unread emails (default: True) search_criteria: IMAP search criteria (default: ALL) Returns: List of email data dictionaries """ if not self.email_address or not self.password: raise IntegrationError("Email credentials not set") try: # Connect to IMAP server imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) imap.login(self.email_address, self.password) # Select folder imap.select(folder) # Search for emails if unread_only: search_criteria = "(UNSEEN)" status, data = imap.search(None, search_criteria) if status != "OK": raise IntegrationError(f"Failed to search emails: {status}") # Get email IDs email_ids = data[0].split() if not email_ids: return [] # Limit the number of emails if limit > 0: email_ids = email_ids[-limit:] emails = [] for email_id in reversed(email_ids): # Process newest first status, data = imap.fetch(email_id, "(RFC822)") if status != "OK": logger.error(f"Failed to fetch email {email_id}: {status}") continue raw_email = data[0][1] email_message = email.message_from_bytes(raw_email) # Extract email data email_data = self._parse_email(email_message, email_id) emails.append(email_data) imap.logout() return emails except Exception as e: logger.error(f"Failed to fetch emails: {str(e)}") raise IntegrationError(f"Failed to fetch emails: {str(e)}") def _parse_email(self, email_message: email.message.Message, email_id: bytes) -> Dict[str, Any]: """Parse email message into a dictionary Args: email_message: Email message object email_id: Email ID Returns: Email data dictionary """ # Extract headers subject = self._decode_header(email_message.get("Subject", "")) from_addr = self._decode_header(email_message.get("From", "")) to_addr = self._decode_header(email_message.get("To", "")) date_str = email_message.get("Date", "") # Parse date date = None if date_str: try: date = email.utils.parsedate_to_datetime(date_str) except: pass # Extract body body = "" if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition", "")) # Skip attachments if "attachment" in content_disposition: continue # Get text content if content_type == "text/plain": try: body_part = part.get_payload(decode=True).decode() body += body_part except: pass else: # Not multipart - get payload directly try: body = email_message.get_payload(decode=True).decode() except: pass # Create email data dictionary email_data = { "id": email_id.decode(), "subject": subject, "from": from_addr, "to": to_addr, "date": date.isoformat() if date else None, "body": body, "is_read": "\\Seen" in email_message.get_flags() if hasattr(email_message, "get_flags") else False } return email_data def _decode_header(self, header: str) -> str: """Decode email header Args: header: Email header Returns: Decoded header """ if not header: return "" try: decoded_header = email.header.decode_header(header) decoded_parts = [] for part, encoding in decoded_header: if isinstance(part, bytes): if encoding: try: decoded_parts.append(part.decode(encoding)) except: decoded_parts.append(part.decode("utf-8", errors="replace")) else: decoded_parts.append(part.decode("utf-8", errors="replace")) else: decoded_parts.append(str(part)) return "".join(decoded_parts) except: return header @handle_exceptions def mark_as_read(self, email_id: str, folder: str = "INBOX") -> bool: """Mark an email as read Args: email_id: Email ID folder: Email folder (default: INBOX) Returns: True if successful, False otherwise """ if not self.email_address or not self.password: raise IntegrationError("Email credentials not set") try: # Connect to IMAP server imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) imap.login(self.email_address, self.password) # Select folder imap.select(folder) # Mark as read imap.store(email_id.encode(), "+FLAGS", "\\Seen") imap.logout() return True except Exception as e: logger.error(f"Failed to mark email as read: {str(e)}") return False @handle_exceptions def send_email(self, to_addr: str, subject: str, body: str, html_body: Optional[str] = None) -> bool: """Send an email Args: to_addr: Recipient email address subject: Email subject body: Email body (plain text) html_body: Email body (HTML, optional) Returns: True if successful, False otherwise """ if not self.email_address or not self.password: raise IntegrationError("Email credentials not set") try: # Create message msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = self.email_address msg["To"] = to_addr # Attach plain text body msg.attach(MIMEText(body, "plain")) # Attach HTML body if provided if html_body: msg.attach(MIMEText(html_body, "html")) # Connect to SMTP server and send email smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) smtp.ehlo() smtp.starttls() smtp.login(self.email_address, self.password) smtp.send_message(msg) smtp.quit() return True except Exception as e: logger.error(f"Failed to send email: {str(e)}") return False @handle_exceptions def convert_email_to_task(self, email_data: Dict[str, Any]) -> Dict[str, Any]: """Convert an email to a task Args: email_data: Email data dictionary Returns: Task data dictionary """ # Extract task information from email subject = email_data.get("subject", "") body = email_data.get("body", "") sender = email_data.get("from", "") date = email_data.get("date") # Extract task title from subject title = subject # Extract priority from subject if present priority = "medium" priority_patterns = { "low": r"\[low\]|\(low\)|low priority", "high": r"\[high\]|\(high\)|high priority|urgent|important" } for p, pattern in priority_patterns.items(): if re.search(pattern, subject, re.IGNORECASE): priority = p # Remove priority tag from title title = re.sub(pattern, "", title, flags=re.IGNORECASE).strip() # Extract due date from body if present due_date = None due_date_patterns = [ r"due\s*(?:date|by)?\s*:?\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", r"deadline\s*:?\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})" ] for pattern in due_date_patterns: match = re.search(pattern, body, re.IGNORECASE) if match: due_date_str = match.group(1) try: # Try to parse the date # This is a simplified version and might need more robust parsing due_date = datetime.strptime(due_date_str, "%d/%m/%Y").isoformat() except: try: due_date = datetime.strptime(due_date_str, "%m/%d/%Y").isoformat() except: pass # Create task data task = { "title": title, "description": f"From: {sender}\n\n{body[:500]}{'...' if len(body) > 500 else ''}", "status": "todo", "priority": priority, "due_date": due_date, "created_at": datetime.now().isoformat(), "source": "email", "source_id": email_data.get("id"), "metadata": { "email_from": sender, "email_date": date, "email_subject": subject } } return task @handle_exceptions def process_emails_to_tasks(self, folder: str = "INBOX", limit: int = 10, unread_only: bool = True, mark_as_read: bool = True) -> List[Dict[str, Any]]: """Process emails and convert them to tasks Args: folder: Email folder to fetch from (default: INBOX) limit: Maximum number of emails to fetch (default: 10) unread_only: Only fetch unread emails (default: True) mark_as_read: Mark processed emails as read (default: True) Returns: List of task data dictionaries """ # Fetch emails emails = self.fetch_emails(folder, limit, unread_only) tasks = [] for email_data in emails: # Convert email to task task = self.convert_email_to_task(email_data) tasks.append(task) # Mark email as read if requested if mark_as_read: self.mark_as_read(email_data["id"], folder) return tasks