import gradio as gr import json import os from typing import Dict, List from datetime import datetime, timedelta from dotenv import load_dotenv # Import OAuth-enabled modules from tools import extract_query_info, analyze_emails from gmail_api_scraper import gmail_scraper from oauth_manager import oauth_manager from logger import logger load_dotenv() def check_authentication() -> tuple[bool, str]: """Check if user is authenticated and return status info""" if oauth_manager.is_authenticated(): user_email = oauth_manager.get_user_email() return True, user_email or "authenticated" return False, "not authenticated" def authenticate_user() -> str: """ Start OAuth authentication flow for Gmail access. Opens a browser window for user to authenticate with Google. Returns: str: JSON string containing authentication result """ try: logger.info("Starting OAuth authentication flow...") # Check if OAuth is configured if not oauth_manager.client_secrets_file.exists(): return json.dumps({ "error": "OAuth not configured", "message": "Please run 'python setup_oauth.py' first to configure OAuth credentials.", "success": False }, indent=2) # Start authentication success = oauth_manager.authenticate_interactive() if success: user_email = oauth_manager.get_current_account() result = { "success": True, "message": "Authentication successful! You can now use the email tools.", "user_email": user_email, "instructions": [ "Authentication completed successfully", "You can now search emails, get email details, and analyze patterns", f"Currently authenticated as: {user_email}" ] } else: result = { "success": False, "error": "Authentication failed", "message": "Please try again or check your internet connection.", "instructions": [ "Make sure you have internet connection", "Ensure you complete the authentication in the browser", "Try running 'python setup_oauth.py' if problems persist" ] } return json.dumps(result, indent=2) except Exception as e: logger.error("Error in authenticate_user: %s", e) error_result = { "success": False, "error": str(e), "message": "Authentication failed due to an error." } return json.dumps(error_result, indent=2) def switch_account(target_email: str) -> str: """ Switch to a different authenticated Gmail account. Args: target_email (str): Email address to switch to Returns: str: JSON string containing switch result """ try: logger.info("Switching to account: %s", target_email) # Check if target account is authenticated if not oauth_manager.is_authenticated(target_email): return json.dumps({ "error": "Account not authenticated", "message": f"Account '{target_email}' is not authenticated. Please authenticate first.", "target_email": target_email, "authenticated_accounts": list(oauth_manager.list_accounts().keys()) }, indent=2) # Switch account success = oauth_manager.switch_account(target_email) if success: result = { "success": True, "message": f"Successfully switched to account: {target_email}", "current_account": oauth_manager.get_current_account(), "previous_account": None # Could track this if needed } else: result = { "success": False, "error": "Failed to switch account", "message": f"Could not switch to account: {target_email}", "current_account": oauth_manager.get_current_account() } return json.dumps(result, indent=2) except Exception as e: logger.error("Error switching account: %s", e) error_result = { "success": False, "error": str(e), "message": f"Failed to switch to account: {target_email}" } return json.dumps(error_result, indent=2) def list_accounts() -> str: """ List all authenticated Gmail accounts and their status. Returns: str: JSON string containing all accounts and their authentication status """ try: logger.info("Listing all accounts") accounts = oauth_manager.list_accounts() current_account = oauth_manager.get_current_account() result = { "accounts": accounts, "current_account": current_account, "total_accounts": len(accounts), "authenticated_accounts": [email for email, is_auth in accounts.items() if is_auth], "message": f"Found {len(accounts)} stored accounts, currently using: {current_account or 'None'}" } return json.dumps(result, indent=2) except Exception as e: logger.error("Error listing accounts: %s", e) error_result = { "error": str(e), "message": "Failed to list accounts" } return json.dumps(error_result, indent=2) def remove_account(email_to_remove: str) -> str: """ Remove an authenticated Gmail account and its stored credentials. Args: email_to_remove (str): Email address to remove Returns: str: JSON string containing removal result """ try: logger.info("Removing account: %s", email_to_remove) # Check if account exists accounts = oauth_manager.list_accounts() if email_to_remove not in accounts: return json.dumps({ "error": "Account not found", "message": f"Account '{email_to_remove}' not found in stored accounts.", "available_accounts": list(accounts.keys()) }, indent=2) # Remove account oauth_manager.remove_account(email_to_remove) result = { "success": True, "message": f"Successfully removed account: {email_to_remove}", "removed_account": email_to_remove, "current_account": oauth_manager.get_current_account(), "remaining_accounts": list(oauth_manager.list_accounts().keys()) } return json.dumps(result, indent=2) except Exception as e: logger.error("Error removing account: %s", e) error_result = { "success": False, "error": str(e), "message": f"Failed to remove account: {email_to_remove}" } return json.dumps(error_result, indent=2) def search_emails(query: str) -> str: """ Search for emails based on a natural language query using OAuth authentication. Args: query (str): Natural language query (e.g., "show me mails from swiggy last week") Returns: str: JSON string containing email search results and analysis """ try: logger.info("OAuth Email search tool called with query: %s", query) # Check authentication is_auth, auth_info = check_authentication() if not is_auth: return json.dumps({ "error": "Not authenticated", "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", "auth_status": auth_info }, indent=2) # Extract sender keyword and date range from query query_info = extract_query_info(query) sender_keyword = query_info.get("sender_keyword", "") start_date = query_info.get("start_date") end_date = query_info.get("end_date") logger.info(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}") # Use Gmail API scraper with OAuth full_emails = gmail_scraper.search_emails(sender_keyword, start_date, end_date) if not full_emails: result = { "query_info": query_info, "email_summary": [], "analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []}, "email_count": 0, "user_email": auth_info } return json.dumps(result, indent=2) # Create summary version without full content email_summary = [] for email in full_emails: summary_email = { "date": email.get("date"), "time": email.get("time"), "subject": email.get("subject"), "from": email.get("from", "Unknown Sender"), "message_id": email.get("message_id"), "gmail_id": email.get("gmail_id") } email_summary.append(summary_email) # Auto-analyze the emails for insights analysis = analyze_emails(full_emails) # Return summary info with analysis result = { "query_info": query_info, "email_summary": email_summary, "analysis": analysis, "email_count": len(full_emails), "user_email": auth_info } return json.dumps(result, indent=2) except Exception as e: logger.error("Error in search_emails: %s", e) error_result = { "error": str(e), "query": query, "message": "Failed to search emails. Please check your authentication and try again." } return json.dumps(error_result, indent=2) def get_email_details(message_id: str) -> str: """ Get full details of a specific email by its message ID using OAuth authentication. Args: message_id (str): The message ID of the email to retrieve Returns: str: JSON string containing the full email details """ try: logger.info("Getting email details for message_id: %s", message_id) # Check authentication is_auth, auth_info = check_authentication() if not is_auth: return json.dumps({ "error": "Not authenticated", "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", "auth_status": auth_info }, indent=2) # Get email using Gmail API email = gmail_scraper.get_email_by_id(message_id) if email: email["user_email"] = auth_info return json.dumps(email, indent=2) else: error_result = { "error": f"No email found with message_id '{message_id}'", "message": "Email may not exist or you may not have access to it.", "user_email": auth_info } return json.dumps(error_result, indent=2) except Exception as e: logger.error("Error in get_email_details: %s", e) error_result = { "error": str(e), "message_id": message_id, "message": "Failed to retrieve email details." } return json.dumps(error_result, indent=2) def analyze_email_patterns(sender_keyword: str, days_back: str = "30") -> str: """ Analyze email patterns from a specific sender over a given time period using OAuth authentication. Args: sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google") days_back (str): Number of days to look back (default: "30") Returns: str: JSON string containing email pattern analysis """ try: logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back) # Check authentication is_auth, auth_info = check_authentication() if not is_auth: return json.dumps({ "error": "Not authenticated", "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", "auth_status": auth_info }, indent=2) # Calculate date range days_int = int(days_back) end_date = datetime.today() start_date = end_date - timedelta(days=days_int) start_date_str = start_date.strftime("%d-%b-%Y") end_date_str = end_date.strftime("%d-%b-%Y") # Search for emails using Gmail API full_emails = gmail_scraper.search_emails(sender_keyword, start_date_str, end_date_str) if not full_emails: result = { "sender_keyword": sender_keyword, "date_range": f"{start_date_str} to {end_date_str}", "analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []}, "email_count": 0, "user_email": auth_info } return json.dumps(result, indent=2) # Analyze the emails analysis = analyze_emails(full_emails) result = { "sender_keyword": sender_keyword, "date_range": f"{start_date_str} to {end_date_str}", "analysis": analysis, "email_count": len(full_emails), "user_email": auth_info } return json.dumps(result, indent=2) except Exception as e: logger.error("Error in analyze_email_patterns: %s", e) error_result = { "error": str(e), "sender_keyword": sender_keyword, "message": "Failed to analyze email patterns." } return json.dumps(error_result, indent=2) def get_authentication_status() -> str: """ Get current authentication status and account information. Returns: str: JSON string containing authentication status """ try: current_account = oauth_manager.get_current_account() is_auth = oauth_manager.is_authenticated() if current_account else False all_accounts = oauth_manager.list_accounts() result = { "authenticated": is_auth, "current_account": current_account, "status": "authenticated" if is_auth else "not_authenticated", "message": f"Current account: {current_account}" if is_auth else "No account selected or not authenticated", "all_accounts": all_accounts, "total_accounts": len(all_accounts), "authenticated_accounts": [email for email, auth in all_accounts.items() if auth] } if not is_auth and not oauth_manager.client_secrets_file.exists(): result["setup_required"] = True result["message"] = "OAuth not configured. Please run 'python setup_oauth.py' first." elif not is_auth and current_account: result["message"] = f"Account {current_account} needs re-authentication" elif not current_account and all_accounts: result["message"] = "Accounts available but none selected. Use switch_account to select one." return json.dumps(result, indent=2) except Exception as e: logger.error("Error checking authentication status: %s", e) return json.dumps({ "error": str(e), "message": "Failed to check authentication status" }, indent=2) # Create Gradio interfaces search_interface = gr.Interface( fn=search_emails, inputs=[ gr.Textbox(label="Query", placeholder="Show me emails from amazon last week") ], outputs=gr.Textbox(label="Search Results", lines=20), title="Email Search (OAuth)", description="Search your emails using natural language queries with OAuth authentication" ) details_interface = gr.Interface( fn=get_email_details, inputs=[ gr.Textbox(label="Message ID", placeholder="Email message ID from search results") ], outputs=gr.Textbox(label="Email Details", lines=20), title="Email Details (OAuth)", description="Get full details of a specific email by message ID with OAuth authentication" ) analysis_interface = gr.Interface( fn=analyze_email_patterns, inputs=[ gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."), gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze") ], outputs=gr.Textbox(label="Analysis Results", lines=20), title="Email Pattern Analysis (OAuth)", description="Analyze email patterns from a specific sender over time with OAuth authentication" ) auth_interface = gr.Interface( fn=authenticate_user, inputs=[], outputs=gr.Textbox(label="Authentication Result", lines=10), title="Authenticate with Gmail", description="Click Submit to start OAuth authentication flow with Gmail" ) status_interface = gr.Interface( fn=get_authentication_status, inputs=[], outputs=gr.Textbox(label="Authentication Status", lines=15), title="Authentication Status", description="Check current authentication status and view all accounts" ) switch_interface = gr.Interface( fn=switch_account, inputs=[ gr.Textbox(label="Target Email", placeholder="email@gmail.com") ], outputs=gr.Textbox(label="Switch Result", lines=10), title="Switch Account", description="Switch to a different authenticated Gmail account" ) accounts_interface = gr.Interface( fn=list_accounts, inputs=[], outputs=gr.Textbox(label="Accounts List", lines=15), title="List All Accounts", description="View all authenticated Gmail accounts and their status" ) remove_interface = gr.Interface( fn=remove_account, inputs=[ gr.Textbox(label="Email to Remove", placeholder="email@gmail.com") ], outputs=gr.Textbox(label="Removal Result", lines=10), title="Remove Account", description="Remove an authenticated Gmail account and its credentials" ) # Combine interfaces into a tabbed interface demo = gr.TabbedInterface( [auth_interface, status_interface, accounts_interface, switch_interface, remove_interface, search_interface, details_interface, analysis_interface], ["šŸ” Authenticate", "šŸ“Š Status", "šŸ‘„ All Accounts", "šŸ”„ Switch Account", "šŸ—‘ļø Remove Account", "šŸ“§ Email Search", "šŸ“„ Email Details", "šŸ“ˆ Pattern Analysis"], title="šŸ“§ Gmail Assistant MCP Server (Multi-Account OAuth)" ) if __name__ == "__main__": # Set environment variable to enable MCP server import os os.environ["GRADIO_MCP_SERVER"] = "True" # Check authentication status on startup current_account = oauth_manager.get_current_account() all_accounts = oauth_manager.list_accounts() if current_account and oauth_manager.is_authenticated(): print(f"āœ… Currently authenticated as: {current_account}") if len(all_accounts) > 1: print(f"šŸ“± {len(all_accounts)} total accounts available: {list(all_accounts.keys())}") elif all_accounts: print(f"šŸ“± {len(all_accounts)} stored accounts found: {list(all_accounts.keys())}") print("āš ļø No current account selected. Use the web interface or Claude to switch accounts.") else: print("āŒ No authenticated accounts. Users will need to authenticate through the web interface.") print("šŸ’” Or run 'python setup_oauth.py' for initial setup.") # Launch the server demo.launch(share=False) print("\nšŸš€ MCP Server is running!") print("šŸ“ MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse") print("šŸ“– Copy this URL to your Claude Desktop MCP configuration") print("\nšŸ”— Web Interface: http://localhost:7860") print("šŸ“ Use the web interface to authenticate and test the tools")