Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Gmail MCP Server with OAuth Authentication and Multi-Account Support | |
""" | |
import gradio as gr | |
import json | |
import os | |
from typing import Dict, List | |
from datetime import datetime, timedelta | |
from dotenv import load_dotenv | |
# Import OAuth-enabled modules | |
from tools import extract_query_info, analyze_emails | |
from gmail_api_scraper import GmailAPIScraper | |
from oauth_manager import oauth_manager | |
from logger import logger | |
load_dotenv() | |
# Initialize Gmail API scraper | |
gmail_scraper = GmailAPIScraper() | |
def check_authentication() -> tuple[bool, str]: | |
"""Check if user is authenticated and return status""" | |
current_account = oauth_manager.get_current_account() | |
if current_account and oauth_manager.is_authenticated(): | |
return True, current_account | |
else: | |
return False, "Not authenticated" | |
def simple_analyze_emails(emails) -> dict: | |
""" | |
Simple email analysis without OpenAI - just basic statistics and patterns | |
""" | |
if not emails: | |
return {"summary": "No emails to analyze.", "insights": []} | |
# Basic statistics | |
total_count = len(emails) | |
# Group by sender | |
senders = {} | |
subjects = [] | |
dates = [] | |
for email in emails: | |
sender = email.get("from", "Unknown") | |
# Extract just the email domain for grouping | |
if "<" in sender and ">" in sender: | |
email_part = sender.split("<")[1].split(">")[0] | |
else: | |
email_part = sender | |
domain = email_part.split("@")[-1] if "@" in email_part else sender | |
senders[domain] = senders.get(domain, 0) + 1 | |
subjects.append(email.get("subject", "")) | |
dates.append(email.get("date", "")) | |
# Create insights | |
insights = [] | |
insights.append(f"Found {total_count} emails total") | |
if senders: | |
top_sender = max(senders.items(), key=lambda x: x[1]) | |
insights.append(f"Most emails from: {top_sender[0]} ({top_sender[1]} emails)") | |
if len(senders) > 1: | |
insights.append(f"Emails from {len(senders)} different domains") | |
# Date range | |
if dates: | |
unique_dates = list(set(dates)) | |
if len(unique_dates) > 1: | |
insights.append(f"Spanning {len(unique_dates)} different days") | |
# Subject analysis | |
if subjects: | |
# Count common words in subjects (simple approach) | |
all_words = [] | |
for subject in subjects: | |
words = subject.lower().split() | |
all_words.extend([w for w in words if len(w) > 3]) # Only words longer than 3 chars | |
if all_words: | |
word_counts = {} | |
for word in all_words: | |
word_counts[word] = word_counts.get(word, 0) + 1 | |
if word_counts: | |
common_word = max(word_counts.items(), key=lambda x: x[1]) | |
if common_word[1] > 1: | |
insights.append(f"Common subject word: '{common_word[0]}' appears {common_word[1]} times") | |
summary = f"Analysis of {total_count} emails from {len(senders)} sender(s)" | |
return { | |
"summary": summary, | |
"insights": insights | |
} | |
def authenticate_user() -> str: | |
""" | |
Start OAuth authentication flow for Gmail access. | |
Opens a browser window for user to authenticate with Google. | |
Returns: | |
str: JSON string containing authentication result | |
""" | |
try: | |
logger.info("Starting OAuth authentication flow...") | |
# Check if OAuth is configured | |
if not oauth_manager.client_secrets_file.exists(): | |
return json.dumps({ | |
"error": "OAuth not configured", | |
"message": "Please run 'python setup_oauth.py' first to configure OAuth credentials.", | |
"success": False | |
}, indent=2) | |
# Start authentication | |
success = oauth_manager.authenticate_interactive() | |
if success: | |
user_email = oauth_manager.get_current_account() | |
result = { | |
"success": True, | |
"message": "Authentication successful! You can now use the email tools.", | |
"user_email": user_email, | |
"instructions": [ | |
"Authentication completed successfully", | |
"You can now search emails, get email details, and analyze patterns", | |
f"Currently authenticated as: {user_email}" | |
] | |
} | |
else: | |
result = { | |
"success": False, | |
"error": "Authentication failed", | |
"message": "Please try again or check your internet connection.", | |
"instructions": [ | |
"Make sure you have internet connection", | |
"Ensure you complete the authentication in the browser", | |
"Try running 'python setup_oauth.py' if problems persist" | |
] | |
} | |
return json.dumps(result, indent=2) | |
except Exception as e: | |
logger.error("Error in authenticate_user: %s", e) | |
error_result = { | |
"success": False, | |
"error": str(e), | |
"message": "Authentication failed due to an error." | |
} | |
return json.dumps(error_result, indent=2) | |
def switch_account(target_email: str) -> str: | |
""" | |
Switch to a different authenticated Gmail account. | |
Args: | |
target_email (str): Email address to switch to | |
Returns: | |
str: JSON string containing switch result | |
""" | |
try: | |
logger.info("Switching to account: %s", target_email) | |
# Check if target account is authenticated | |
if not oauth_manager.is_authenticated(target_email): | |
return json.dumps({ | |
"error": "Account not authenticated", | |
"message": f"Account '{target_email}' is not authenticated. Please authenticate first.", | |
"target_email": target_email, | |
"authenticated_accounts": list(oauth_manager.list_accounts().keys()) | |
}, indent=2) | |
# Switch account | |
success = oauth_manager.switch_account(target_email) | |
if success: | |
result = { | |
"success": True, | |
"message": f"Successfully switched to account: {target_email}", | |
"current_account": oauth_manager.get_current_account(), | |
"previous_account": None # Could track this if needed | |
} | |
else: | |
result = { | |
"success": False, | |
"error": "Failed to switch account", | |
"message": f"Could not switch to account: {target_email}", | |
"current_account": oauth_manager.get_current_account() | |
} | |
return json.dumps(result, indent=2) | |
except Exception as e: | |
logger.error("Error switching account: %s", e) | |
error_result = { | |
"success": False, | |
"error": str(e), | |
"message": f"Failed to switch to account: {target_email}" | |
} | |
return json.dumps(error_result, indent=2) | |
def list_accounts() -> str: | |
""" | |
List all authenticated Gmail accounts and their status. | |
Returns: | |
str: JSON string containing all accounts and their authentication status | |
""" | |
try: | |
logger.info("Listing all accounts") | |
accounts = oauth_manager.list_accounts() | |
current_account = oauth_manager.get_current_account() | |
result = { | |
"accounts": accounts, | |
"current_account": current_account, | |
"total_accounts": len(accounts), | |
"authenticated_accounts": [email for email, is_auth in accounts.items() if is_auth], | |
"message": f"Found {len(accounts)} stored accounts, currently using: {current_account or 'None'}" | |
} | |
return json.dumps(result, indent=2) | |
except Exception as e: | |
logger.error("Error listing accounts: %s", e) | |
error_result = { | |
"error": str(e), | |
"message": "Failed to list accounts" | |
} | |
return json.dumps(error_result, indent=2) | |
def remove_account(email_to_remove: str) -> str: | |
""" | |
Remove an authenticated Gmail account and its stored credentials. | |
Args: | |
email_to_remove (str): Email address to remove | |
Returns: | |
str: JSON string containing removal result | |
""" | |
try: | |
logger.info("Removing account: %s", email_to_remove) | |
# Check if account exists | |
accounts = oauth_manager.list_accounts() | |
if email_to_remove not in accounts: | |
return json.dumps({ | |
"error": "Account not found", | |
"message": f"Account '{email_to_remove}' not found in stored accounts.", | |
"available_accounts": list(accounts.keys()) | |
}, indent=2) | |
# Remove account | |
oauth_manager.remove_account(email_to_remove) | |
result = { | |
"success": True, | |
"message": f"Successfully removed account: {email_to_remove}", | |
"removed_account": email_to_remove, | |
"current_account": oauth_manager.get_current_account(), | |
"remaining_accounts": list(oauth_manager.list_accounts().keys()) | |
} | |
return json.dumps(result, indent=2) | |
except Exception as e: | |
logger.error("Error removing account: %s", e) | |
error_result = { | |
"success": False, | |
"error": str(e), | |
"message": f"Failed to remove account: {email_to_remove}" | |
} | |
return json.dumps(error_result, indent=2) | |
def search_emails(sender_keyword: str, start_date: str = "", end_date: str = "") -> str: | |
""" | |
Search for emails from a specific sender within a date range using OAuth authentication. | |
Args: | |
sender_keyword (str): The sender/company keyword to search for (e.g., "apple", "amazon") | |
start_date (str): Start date in DD-MMM-YYYY format (e.g., "01-Jan-2025"). If empty, defaults to 7 days ago. | |
end_date (str): End date in DD-MMM-YYYY format (e.g., "07-Jan-2025"). If empty, defaults to today. | |
Returns: | |
str: JSON string containing email search results and analysis | |
""" | |
try: | |
logger.info("OAuth Email search tool called with sender: %s, dates: %s to %s", sender_keyword, start_date, end_date) | |
# Check authentication | |
is_auth, auth_info = check_authentication() | |
if not is_auth: | |
return json.dumps({ | |
"error": "Not authenticated", | |
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
"auth_status": auth_info | |
}, indent=2) | |
# Set default date range if not provided | |
if not start_date or not end_date: | |
today = datetime.today() | |
if not end_date: | |
end_date = today.strftime("%d-%b-%Y") | |
if not start_date: | |
start_date = (today - timedelta(days=7)).strftime("%d-%b-%Y") | |
logger.info(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}") | |
# Use Gmail API scraper with OAuth | |
full_emails = gmail_scraper.search_emails(sender_keyword, start_date, end_date) | |
if not full_emails: | |
result = { | |
"sender_keyword": sender_keyword, | |
"date_range": f"{start_date} to {end_date}", | |
"email_summary": [], | |
"analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []}, | |
"email_count": 0, | |
"user_email": auth_info | |
} | |
return json.dumps(result, indent=2) | |
# Create summary version without full content | |
email_summary = [] | |
for email in full_emails: | |
summary_email = { | |
"date": email.get("date"), | |
"time": email.get("time"), | |
"subject": email.get("subject"), | |
"from": email.get("from", "Unknown Sender"), | |
"message_id": email.get("message_id"), | |
"gmail_id": email.get("gmail_id") | |
} | |
email_summary.append(summary_email) | |
# Auto-analyze the emails for insights (no OpenAI) | |
analysis = simple_analyze_emails(full_emails) | |
# Return summary info with analysis | |
result = { | |
"sender_keyword": sender_keyword, | |
"date_range": f"{start_date} to {end_date}", | |
"email_summary": email_summary, | |
"analysis": analysis, | |
"email_count": len(full_emails), | |
"user_email": auth_info | |
} | |
return json.dumps(result, indent=2) | |
except Exception as e: | |
logger.error("Error in search_emails: %s", e) | |
error_result = { | |
"error": str(e), | |
"sender_keyword": sender_keyword, | |
"message": "Failed to search emails." | |
} | |
return json.dumps(error_result, indent=2) | |
def get_email_details(message_id: str) -> str: | |
""" | |
Get full details of a specific email by its message ID using OAuth authentication. | |
Args: | |
message_id (str): The message ID of the email to retrieve | |
Returns: | |
str: JSON string containing the full email details | |
""" | |
try: | |
logger.info("Getting email details for message_id: %s", message_id) | |
# Check authentication | |
is_auth, auth_info = check_authentication() | |
if not is_auth: | |
return json.dumps({ | |
"error": "Not authenticated", | |
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
"auth_status": auth_info | |
}, indent=2) | |
# Get email using Gmail API | |
email = gmail_scraper.get_email_by_id(message_id) | |
if email: | |
email["user_email"] = auth_info | |
return json.dumps(email, indent=2) | |
else: | |
error_result = { | |
"error": f"No email found with message_id '{message_id}'", | |
"message": "Email may not exist or you may not have access to it.", | |
"user_email": auth_info | |
} | |
return json.dumps(error_result, indent=2) | |
except Exception as e: | |
logger.error("Error in get_email_details: %s", e) | |
error_result = { | |
"error": str(e), | |
"message_id": message_id, | |
"message": "Failed to retrieve email details." | |
} | |
return json.dumps(error_result, indent=2) | |
def analyze_email_patterns(sender_keyword: str, days_back: str = "30") -> str: | |
""" | |
Analyze email patterns from a specific sender over a given time period using OAuth authentication. | |
Args: | |
sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google") | |
days_back (str): Number of days to look back (default: "30") | |
Returns: | |
str: JSON string containing email pattern analysis | |
""" | |
try: | |
logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back) | |
# Check authentication | |
is_auth, auth_info = check_authentication() | |
if not is_auth: | |
return json.dumps({ | |
"error": "Not authenticated", | |
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
"auth_status": auth_info | |
}, indent=2) | |
# Calculate date range | |
days_int = int(days_back) | |
end_date = datetime.today() | |
start_date = end_date - timedelta(days=days_int) | |
start_date_str = start_date.strftime("%d-%b-%Y") | |
end_date_str = end_date.strftime("%d-%b-%Y") | |
# Search for emails using Gmail API | |
full_emails = gmail_scraper.search_emails(sender_keyword, start_date_str, end_date_str) | |
if not full_emails: | |
result = { | |
"sender_keyword": sender_keyword, | |
"date_range": f"{start_date_str} to {end_date_str}", | |
"analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []}, | |
"email_count": 0, | |
"user_email": auth_info | |
} | |
return json.dumps(result, indent=2) | |
# Analyze the emails (no OpenAI) | |
analysis = simple_analyze_emails(full_emails) | |
result = { | |
"sender_keyword": sender_keyword, | |
"date_range": f"{start_date_str} to {end_date_str}", | |
"analysis": analysis, | |
"email_count": len(full_emails), | |
"user_email": auth_info | |
} | |
return json.dumps(result, indent=2) | |
except Exception as e: | |
logger.error("Error in analyze_email_patterns: %s", e) | |
error_result = { | |
"error": str(e), | |
"sender_keyword": sender_keyword, | |
"message": "Failed to analyze email patterns." | |
} | |
return json.dumps(error_result, indent=2) | |
def get_authentication_status() -> str: | |
""" | |
Get current authentication status and account information. | |
Returns: | |
str: JSON string containing authentication status | |
""" | |
try: | |
current_account = oauth_manager.get_current_account() | |
is_auth = oauth_manager.is_authenticated() if current_account else False | |
all_accounts = oauth_manager.list_accounts() | |
result = { | |
"authenticated": is_auth, | |
"current_account": current_account, | |
"status": "authenticated" if is_auth else "not_authenticated", | |
"message": f"Current account: {current_account}" if is_auth else "No account selected or not authenticated", | |
"all_accounts": all_accounts, | |
"total_accounts": len(all_accounts), | |
"authenticated_accounts": [email for email, auth in all_accounts.items() if auth] | |
} | |
if not is_auth and not oauth_manager.client_secrets_file.exists(): | |
result["setup_required"] = True | |
result["message"] = "OAuth not configured. Please run 'python setup_oauth.py' first." | |
elif not is_auth and current_account: | |
result["message"] = f"Account {current_account} needs re-authentication" | |
elif not current_account and all_accounts: | |
result["message"] = "Accounts available but none selected. Use switch_account to select one." | |
return json.dumps(result, indent=2) | |
except Exception as e: | |
logger.error("Error checking authentication status: %s", e) | |
return json.dumps({ | |
"error": str(e), | |
"message": "Failed to check authentication status" | |
}, indent=2) | |
# Create Gradio interfaces | |
search_interface = gr.Interface( | |
fn=search_emails, | |
inputs=[ | |
gr.Textbox(label="Sender Keyword", placeholder="apple, amazon, etc."), | |
gr.Textbox(label="Start Date (Optional)", placeholder="01-Jan-2025 (leave empty for last 7 days)"), | |
gr.Textbox(label="End Date (Optional)", placeholder="07-Jan-2025 (leave empty for today)") | |
], | |
outputs=gr.Textbox(label="Search Results", lines=20), | |
title="Email Search (OAuth)", | |
description="Search your emails by sender keyword and date range with OAuth authentication" | |
) | |
details_interface = gr.Interface( | |
fn=get_email_details, | |
inputs=[ | |
gr.Textbox(label="Message ID", placeholder="Email message ID from search results") | |
], | |
outputs=gr.Textbox(label="Email Details", lines=20), | |
title="Email Details (OAuth)", | |
description="Get full details of a specific email by message ID with OAuth authentication" | |
) | |
analysis_interface = gr.Interface( | |
fn=analyze_email_patterns, | |
inputs=[ | |
gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."), | |
gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze") | |
], | |
outputs=gr.Textbox(label="Analysis Results", lines=20), | |
title="Email Pattern Analysis (OAuth)", | |
description="Analyze email patterns from a specific sender over time with OAuth authentication" | |
) | |
auth_interface = gr.Interface( | |
fn=authenticate_user, | |
inputs=[], | |
outputs=gr.Textbox(label="Authentication Result", lines=10), | |
title="Authenticate with Gmail", | |
description="Click Submit to start OAuth authentication flow with Gmail" | |
) | |
status_interface = gr.Interface( | |
fn=get_authentication_status, | |
inputs=[], | |
outputs=gr.Textbox(label="Authentication Status", lines=15), | |
title="Authentication Status", | |
description="Check current authentication status and view all accounts" | |
) | |
switch_interface = gr.Interface( | |
fn=switch_account, | |
inputs=[ | |
gr.Textbox(label="Target Email", placeholder="[email protected]") | |
], | |
outputs=gr.Textbox(label="Switch Result", lines=10), | |
title="Switch Account", | |
description="Switch to a different authenticated Gmail account" | |
) | |
accounts_interface = gr.Interface( | |
fn=list_accounts, | |
inputs=[], | |
outputs=gr.Textbox(label="Accounts List", lines=15), | |
title="List All Accounts", | |
description="View all authenticated Gmail accounts and their status" | |
) | |
remove_interface = gr.Interface( | |
fn=remove_account, | |
inputs=[ | |
gr.Textbox(label="Email to Remove", placeholder="[email protected]") | |
], | |
outputs=gr.Textbox(label="Removal Result", lines=10), | |
title="Remove Account", | |
description="Remove an authenticated Gmail account and its credentials" | |
) | |
# Combine interfaces into a tabbed interface | |
demo = gr.TabbedInterface( | |
[auth_interface, status_interface, accounts_interface, switch_interface, remove_interface, search_interface, details_interface, analysis_interface], | |
["π Authenticate", "π Status", "π₯ All Accounts", "π Switch Account", "ποΈ Remove Account", "π§ Email Search", "π Email Details", "π Pattern Analysis"], | |
title="π§ Gmail Assistant MCP Server (Multi-Account OAuth)" | |
) | |
if __name__ == "__main__": | |
# Set environment variable to enable MCP server | |
import os | |
os.environ["GRADIO_MCP_SERVER"] = "True" | |
# Check authentication status on startup | |
current_account = oauth_manager.get_current_account() | |
all_accounts = oauth_manager.list_accounts() | |
if current_account and oauth_manager.is_authenticated(): | |
print(f"β Currently authenticated as: {current_account}") | |
if len(all_accounts) > 1: | |
print(f"π± {len(all_accounts)} total accounts available: {list(all_accounts.keys())}") | |
elif all_accounts: | |
print(f"π± {len(all_accounts)} stored accounts found: {list(all_accounts.keys())}") | |
print("β οΈ No current account selected. Use the web interface or Claude to switch accounts.") | |
else: | |
print("β No authenticated accounts. Users will need to authenticate through the web interface.") | |
print("π‘ Or run 'python setup_oauth.py' for initial setup.") | |
# Launch the server | |
demo.launch(share=False) | |
print("\nπ MCP Server is running!") | |
print("π MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse") | |
print("π Copy this URL to your Claude Desktop MCP configuration") | |
print("\nπ Web Interface: http://localhost:7860") | |
print("π Use the web interface to authenticate and test the tools") |