#!/usr/bin/env python3 """ Gmail MCP Server with OAuth Authentication and Multi-Account Support """ import gradio as gr import json import os from typing import Dict, List from datetime import datetime, timedelta from dotenv import load_dotenv # Import OAuth-enabled modules from tools import extract_query_info, analyze_emails from gmail_api_scraper import GmailAPIScraper from oauth_manager import oauth_manager from logger import logger load_dotenv() # Initialize Gmail API scraper gmail_scraper = GmailAPIScraper() def check_authentication() -> tuple[bool, str]: """Check if user is authenticated and return status""" current_account = oauth_manager.get_current_account() if current_account and oauth_manager.is_authenticated(): return True, current_account else: return False, "Not authenticated" def simple_analyze_emails(emails) -> dict: """ Simple email analysis without OpenAI - just basic statistics and patterns """ if not emails: return {"summary": "No emails to analyze.", "insights": []} # Basic statistics total_count = len(emails) # Group by sender senders = {} subjects = [] dates = [] for email in emails: sender = email.get("from", "Unknown") # Extract just the email domain for grouping if "<" in sender and ">" in sender: email_part = sender.split("<")[1].split(">")[0] else: email_part = sender domain = email_part.split("@")[-1] if "@" in email_part else sender senders[domain] = senders.get(domain, 0) + 1 subjects.append(email.get("subject", "")) dates.append(email.get("date", "")) # Create insights insights = [] insights.append(f"Found {total_count} emails total") if senders: top_sender = max(senders.items(), key=lambda x: x[1]) insights.append(f"Most emails from: {top_sender[0]} ({top_sender[1]} emails)") if len(senders) > 1: insights.append(f"Emails from {len(senders)} different domains") # Date range if dates: unique_dates = list(set(dates)) if len(unique_dates) > 1: insights.append(f"Spanning {len(unique_dates)} different days") # Subject analysis if subjects: # Count common words in subjects (simple approach) all_words = [] for subject in subjects: words = subject.lower().split() all_words.extend([w for w in words if len(w) > 3]) # Only words longer than 3 chars if all_words: word_counts = {} for word in all_words: word_counts[word] = word_counts.get(word, 0) + 1 if word_counts: common_word = max(word_counts.items(), key=lambda x: x[1]) if common_word[1] > 1: insights.append(f"Common subject word: '{common_word[0]}' appears {common_word[1]} times") summary = f"Analysis of {total_count} emails from {len(senders)} sender(s)" return { "summary": summary, "insights": insights } def authenticate_user() -> str: """ Start OAuth authentication flow for Gmail access. Opens a browser window for user to authenticate with Google. Returns: str: JSON string containing authentication result """ try: logger.info("Starting OAuth authentication flow...") # Check if OAuth is configured if not oauth_manager.client_secrets_file.exists(): return json.dumps({ "error": "OAuth not configured", "message": "Please run 'python setup_oauth.py' first to configure OAuth credentials.", "success": False }, indent=2) # Start authentication success = oauth_manager.authenticate_interactive() if success: user_email = oauth_manager.get_current_account() result = { "success": True, "message": "Authentication successful! You can now use the email tools.", "user_email": user_email, "instructions": [ "Authentication completed successfully", "You can now search emails, get email details, and analyze patterns", f"Currently authenticated as: {user_email}" ] } else: result = { "success": False, "error": "Authentication failed", "message": "Please try again or check your internet connection.", "instructions": [ "Make sure you have internet connection", "Ensure you complete the authentication in the browser", "Try running 'python setup_oauth.py' if problems persist" ] } return json.dumps(result, indent=2) except Exception as e: logger.error("Error in authenticate_user: %s", e) error_result = { "success": False, "error": str(e), "message": "Authentication failed due to an error." } return json.dumps(error_result, indent=2) def switch_account(target_email: str) -> str: """ Switch to a different authenticated Gmail account. Args: target_email (str): Email address to switch to Returns: str: JSON string containing switch result """ try: logger.info("Switching to account: %s", target_email) # Check if target account is authenticated if not oauth_manager.is_authenticated(target_email): return json.dumps({ "error": "Account not authenticated", "message": f"Account '{target_email}' is not authenticated. Please authenticate first.", "target_email": target_email, "authenticated_accounts": list(oauth_manager.list_accounts().keys()) }, indent=2) # Switch account success = oauth_manager.switch_account(target_email) if success: result = { "success": True, "message": f"Successfully switched to account: {target_email}", "current_account": oauth_manager.get_current_account(), "previous_account": None # Could track this if needed } else: result = { "success": False, "error": "Failed to switch account", "message": f"Could not switch to account: {target_email}", "current_account": oauth_manager.get_current_account() } return json.dumps(result, indent=2) except Exception as e: logger.error("Error switching account: %s", e) error_result = { "success": False, "error": str(e), "message": f"Failed to switch to account: {target_email}" } return json.dumps(error_result, indent=2) def list_accounts() -> str: """ List all authenticated Gmail accounts and their status. Returns: str: JSON string containing all accounts and their authentication status """ try: logger.info("Listing all accounts") accounts = oauth_manager.list_accounts() current_account = oauth_manager.get_current_account() result = { "accounts": accounts, "current_account": current_account, "total_accounts": len(accounts), "authenticated_accounts": [email for email, is_auth in accounts.items() if is_auth], "message": f"Found {len(accounts)} stored accounts, currently using: {current_account or 'None'}" } return json.dumps(result, indent=2) except Exception as e: logger.error("Error listing accounts: %s", e) error_result = { "error": str(e), "message": "Failed to list accounts" } return json.dumps(error_result, indent=2) def remove_account(email_to_remove: str) -> str: """ Remove an authenticated Gmail account and its stored credentials. Args: email_to_remove (str): Email address to remove Returns: str: JSON string containing removal result """ try: logger.info("Removing account: %s", email_to_remove) # Check if account exists accounts = oauth_manager.list_accounts() if email_to_remove not in accounts: return json.dumps({ "error": "Account not found", "message": f"Account '{email_to_remove}' not found in stored accounts.", "available_accounts": list(accounts.keys()) }, indent=2) # Remove account oauth_manager.remove_account(email_to_remove) result = { "success": True, "message": f"Successfully removed account: {email_to_remove}", "removed_account": email_to_remove, "current_account": oauth_manager.get_current_account(), "remaining_accounts": list(oauth_manager.list_accounts().keys()) } return json.dumps(result, indent=2) except Exception as e: logger.error("Error removing account: %s", e) error_result = { "success": False, "error": str(e), "message": f"Failed to remove account: {email_to_remove}" } return json.dumps(error_result, indent=2) def search_emails(sender_keyword: str, start_date: str = "", end_date: str = "") -> str: """ Search for emails from a specific sender within a date range using OAuth authentication. Args: sender_keyword (str): The sender/company keyword to search for (e.g., "apple", "amazon") start_date (str): Start date in DD-MMM-YYYY format (e.g., "01-Jan-2025"). If empty, defaults to 7 days ago. end_date (str): End date in DD-MMM-YYYY format (e.g., "07-Jan-2025"). If empty, defaults to today. Returns: str: JSON string containing email search results and analysis """ try: logger.info("OAuth Email search tool called with sender: %s, dates: %s to %s", sender_keyword, start_date, end_date) # Check authentication is_auth, auth_info = check_authentication() if not is_auth: return json.dumps({ "error": "Not authenticated", "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", "auth_status": auth_info }, indent=2) # Set default date range if not provided if not start_date or not end_date: today = datetime.today() if not end_date: end_date = today.strftime("%d-%b-%Y") if not start_date: start_date = (today - timedelta(days=7)).strftime("%d-%b-%Y") logger.info(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}") # Use Gmail API scraper with OAuth full_emails = gmail_scraper.search_emails(sender_keyword, start_date, end_date) if not full_emails: result = { "sender_keyword": sender_keyword, "date_range": f"{start_date} to {end_date}", "email_summary": [], "analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []}, "email_count": 0, "user_email": auth_info } return json.dumps(result, indent=2) # Create summary version without full content email_summary = [] for email in full_emails: summary_email = { "date": email.get("date"), "time": email.get("time"), "subject": email.get("subject"), "from": email.get("from", "Unknown Sender"), "message_id": email.get("message_id"), "gmail_id": email.get("gmail_id") } email_summary.append(summary_email) # Auto-analyze the emails for insights (no OpenAI) analysis = simple_analyze_emails(full_emails) # Return summary info with analysis result = { "sender_keyword": sender_keyword, "date_range": f"{start_date} to {end_date}", "email_summary": email_summary, "analysis": analysis, "email_count": len(full_emails), "user_email": auth_info } return json.dumps(result, indent=2) except Exception as e: logger.error("Error in search_emails: %s", e) error_result = { "error": str(e), "sender_keyword": sender_keyword, "message": "Failed to search emails." } return json.dumps(error_result, indent=2) def get_email_details(message_id: str) -> str: """ Get full details of a specific email by its message ID using OAuth authentication. Args: message_id (str): The message ID of the email to retrieve Returns: str: JSON string containing the full email details """ try: logger.info("Getting email details for message_id: %s", message_id) # Check authentication is_auth, auth_info = check_authentication() if not is_auth: return json.dumps({ "error": "Not authenticated", "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", "auth_status": auth_info }, indent=2) # Get email using Gmail API email = gmail_scraper.get_email_by_id(message_id) if email: email["user_email"] = auth_info return json.dumps(email, indent=2) else: error_result = { "error": f"No email found with message_id '{message_id}'", "message": "Email may not exist or you may not have access to it.", "user_email": auth_info } return json.dumps(error_result, indent=2) except Exception as e: logger.error("Error in get_email_details: %s", e) error_result = { "error": str(e), "message_id": message_id, "message": "Failed to retrieve email details." } return json.dumps(error_result, indent=2) def analyze_email_patterns(sender_keyword: str, days_back: str = "30") -> str: """ Analyze email patterns from a specific sender over a given time period using OAuth authentication. Args: sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google") days_back (str): Number of days to look back (default: "30") Returns: str: JSON string containing email pattern analysis """ try: logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back) # Check authentication is_auth, auth_info = check_authentication() if not is_auth: return json.dumps({ "error": "Not authenticated", "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", "auth_status": auth_info }, indent=2) # Calculate date range days_int = int(days_back) end_date = datetime.today() start_date = end_date - timedelta(days=days_int) start_date_str = start_date.strftime("%d-%b-%Y") end_date_str = end_date.strftime("%d-%b-%Y") # Search for emails using Gmail API full_emails = gmail_scraper.search_emails(sender_keyword, start_date_str, end_date_str) if not full_emails: result = { "sender_keyword": sender_keyword, "date_range": f"{start_date_str} to {end_date_str}", "analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []}, "email_count": 0, "user_email": auth_info } return json.dumps(result, indent=2) # Analyze the emails (no OpenAI) analysis = simple_analyze_emails(full_emails) result = { "sender_keyword": sender_keyword, "date_range": f"{start_date_str} to {end_date_str}", "analysis": analysis, "email_count": len(full_emails), "user_email": auth_info } return json.dumps(result, indent=2) except Exception as e: logger.error("Error in analyze_email_patterns: %s", e) error_result = { "error": str(e), "sender_keyword": sender_keyword, "message": "Failed to analyze email patterns." } return json.dumps(error_result, indent=2) def get_authentication_status() -> str: """ Get current authentication status and account information. Returns: str: JSON string containing authentication status """ try: current_account = oauth_manager.get_current_account() is_auth = oauth_manager.is_authenticated() if current_account else False all_accounts = oauth_manager.list_accounts() result = { "authenticated": is_auth, "current_account": current_account, "status": "authenticated" if is_auth else "not_authenticated", "message": f"Current account: {current_account}" if is_auth else "No account selected or not authenticated", "all_accounts": all_accounts, "total_accounts": len(all_accounts), "authenticated_accounts": [email for email, auth in all_accounts.items() if auth] } if not is_auth and not oauth_manager.client_secrets_file.exists(): result["setup_required"] = True result["message"] = "OAuth not configured. Please run 'python setup_oauth.py' first." elif not is_auth and current_account: result["message"] = f"Account {current_account} needs re-authentication" elif not current_account and all_accounts: result["message"] = "Accounts available but none selected. Use switch_account to select one." return json.dumps(result, indent=2) except Exception as e: logger.error("Error checking authentication status: %s", e) return json.dumps({ "error": str(e), "message": "Failed to check authentication status" }, indent=2) # Create Gradio interfaces search_interface = gr.Interface( fn=search_emails, inputs=[ gr.Textbox(label="Sender Keyword", placeholder="apple, amazon, etc."), gr.Textbox(label="Start Date (Optional)", placeholder="01-Jan-2025 (leave empty for last 7 days)"), gr.Textbox(label="End Date (Optional)", placeholder="07-Jan-2025 (leave empty for today)") ], outputs=gr.Textbox(label="Search Results", lines=20), title="Email Search (OAuth)", description="Search your emails by sender keyword and date range with OAuth authentication" ) details_interface = gr.Interface( fn=get_email_details, inputs=[ gr.Textbox(label="Message ID", placeholder="Email message ID from search results") ], outputs=gr.Textbox(label="Email Details", lines=20), title="Email Details (OAuth)", description="Get full details of a specific email by message ID with OAuth authentication" ) analysis_interface = gr.Interface( fn=analyze_email_patterns, inputs=[ gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."), gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze") ], outputs=gr.Textbox(label="Analysis Results", lines=20), title="Email Pattern Analysis (OAuth)", description="Analyze email patterns from a specific sender over time with OAuth authentication" ) auth_interface = gr.Interface( fn=authenticate_user, inputs=[], outputs=gr.Textbox(label="Authentication Result", lines=10), title="Authenticate with Gmail", description="Click Submit to start OAuth authentication flow with Gmail" ) status_interface = gr.Interface( fn=get_authentication_status, inputs=[], outputs=gr.Textbox(label="Authentication Status", lines=15), title="Authentication Status", description="Check current authentication status and view all accounts" ) switch_interface = gr.Interface( fn=switch_account, inputs=[ gr.Textbox(label="Target Email", placeholder="email@gmail.com") ], outputs=gr.Textbox(label="Switch Result", lines=10), title="Switch Account", description="Switch to a different authenticated Gmail account" ) accounts_interface = gr.Interface( fn=list_accounts, inputs=[], outputs=gr.Textbox(label="Accounts List", lines=15), title="List All Accounts", description="View all authenticated Gmail accounts and their status" ) remove_interface = gr.Interface( fn=remove_account, inputs=[ gr.Textbox(label="Email to Remove", placeholder="email@gmail.com") ], outputs=gr.Textbox(label="Removal Result", lines=10), title="Remove Account", description="Remove an authenticated Gmail account and its credentials" ) # Combine interfaces into a tabbed interface demo = gr.TabbedInterface( [auth_interface, status_interface, accounts_interface, switch_interface, remove_interface, search_interface, details_interface, analysis_interface], ["šŸ” Authenticate", "šŸ“Š Status", "šŸ‘„ All Accounts", "šŸ”„ Switch Account", "šŸ—‘ļø Remove Account", "šŸ“§ Email Search", "šŸ“„ Email Details", "šŸ“ˆ Pattern Analysis"], title="šŸ“§ Gmail Assistant MCP Server (Multi-Account OAuth)" ) if __name__ == "__main__": # Set environment variable to enable MCP server import os os.environ["GRADIO_MCP_SERVER"] = "True" # Check authentication status on startup current_account = oauth_manager.get_current_account() all_accounts = oauth_manager.list_accounts() if current_account and oauth_manager.is_authenticated(): print(f"āœ… Currently authenticated as: {current_account}") if len(all_accounts) > 1: print(f"šŸ“± {len(all_accounts)} total accounts available: {list(all_accounts.keys())}") elif all_accounts: print(f"šŸ“± {len(all_accounts)} stored accounts found: {list(all_accounts.keys())}") print("āš ļø No current account selected. Use the web interface or Claude to switch accounts.") else: print("āŒ No authenticated accounts. Users will need to authenticate through the web interface.") print("šŸ’” Or run 'python setup_oauth.py' for initial setup.") # Launch the server demo.launch(share=False) print("\nšŸš€ MCP Server is running!") print("šŸ“ MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse") print("šŸ“– Copy this URL to your Claude Desktop MCP configuration") print("\nšŸ”— Web Interface: http://localhost:7860") print("šŸ“ Use the web interface to authenticate and test the tools")