MailQuery / agentic_implementation /email_mcp_server_oauth.py
Arun Raghav
removed open ai involvement
6b7ccb5
raw
history blame
23.8 kB
#!/usr/bin/env python3
"""
Gmail MCP Server with OAuth Authentication and Multi-Account Support
"""
import gradio as gr
import json
import os
from typing import Dict, List
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Import OAuth-enabled modules
from tools import extract_query_info, analyze_emails
from gmail_api_scraper import GmailAPIScraper
from oauth_manager import oauth_manager
from logger import logger
load_dotenv()
# Initialize Gmail API scraper
gmail_scraper = GmailAPIScraper()
def check_authentication() -> tuple[bool, str]:
"""Check if user is authenticated and return status"""
current_account = oauth_manager.get_current_account()
if current_account and oauth_manager.is_authenticated():
return True, current_account
else:
return False, "Not authenticated"
def simple_analyze_emails(emails) -> dict:
"""
Simple email analysis without OpenAI - just basic statistics and patterns
"""
if not emails:
return {"summary": "No emails to analyze.", "insights": []}
# Basic statistics
total_count = len(emails)
# Group by sender
senders = {}
subjects = []
dates = []
for email in emails:
sender = email.get("from", "Unknown")
# Extract just the email domain for grouping
if "<" in sender and ">" in sender:
email_part = sender.split("<")[1].split(">")[0]
else:
email_part = sender
domain = email_part.split("@")[-1] if "@" in email_part else sender
senders[domain] = senders.get(domain, 0) + 1
subjects.append(email.get("subject", ""))
dates.append(email.get("date", ""))
# Create insights
insights = []
insights.append(f"Found {total_count} emails total")
if senders:
top_sender = max(senders.items(), key=lambda x: x[1])
insights.append(f"Most emails from: {top_sender[0]} ({top_sender[1]} emails)")
if len(senders) > 1:
insights.append(f"Emails from {len(senders)} different domains")
# Date range
if dates:
unique_dates = list(set(dates))
if len(unique_dates) > 1:
insights.append(f"Spanning {len(unique_dates)} different days")
# Subject analysis
if subjects:
# Count common words in subjects (simple approach)
all_words = []
for subject in subjects:
words = subject.lower().split()
all_words.extend([w for w in words if len(w) > 3]) # Only words longer than 3 chars
if all_words:
word_counts = {}
for word in all_words:
word_counts[word] = word_counts.get(word, 0) + 1
if word_counts:
common_word = max(word_counts.items(), key=lambda x: x[1])
if common_word[1] > 1:
insights.append(f"Common subject word: '{common_word[0]}' appears {common_word[1]} times")
summary = f"Analysis of {total_count} emails from {len(senders)} sender(s)"
return {
"summary": summary,
"insights": insights
}
def authenticate_user() -> str:
"""
Start OAuth authentication flow for Gmail access.
Opens a browser window for user to authenticate with Google.
Returns:
str: JSON string containing authentication result
"""
try:
logger.info("Starting OAuth authentication flow...")
# Check if OAuth is configured
if not oauth_manager.client_secrets_file.exists():
return json.dumps({
"error": "OAuth not configured",
"message": "Please run 'python setup_oauth.py' first to configure OAuth credentials.",
"success": False
}, indent=2)
# Start authentication
success = oauth_manager.authenticate_interactive()
if success:
user_email = oauth_manager.get_current_account()
result = {
"success": True,
"message": "Authentication successful! You can now use the email tools.",
"user_email": user_email,
"instructions": [
"Authentication completed successfully",
"You can now search emails, get email details, and analyze patterns",
f"Currently authenticated as: {user_email}"
]
}
else:
result = {
"success": False,
"error": "Authentication failed",
"message": "Please try again or check your internet connection.",
"instructions": [
"Make sure you have internet connection",
"Ensure you complete the authentication in the browser",
"Try running 'python setup_oauth.py' if problems persist"
]
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error in authenticate_user: %s", e)
error_result = {
"success": False,
"error": str(e),
"message": "Authentication failed due to an error."
}
return json.dumps(error_result, indent=2)
def switch_account(target_email: str) -> str:
"""
Switch to a different authenticated Gmail account.
Args:
target_email (str): Email address to switch to
Returns:
str: JSON string containing switch result
"""
try:
logger.info("Switching to account: %s", target_email)
# Check if target account is authenticated
if not oauth_manager.is_authenticated(target_email):
return json.dumps({
"error": "Account not authenticated",
"message": f"Account '{target_email}' is not authenticated. Please authenticate first.",
"target_email": target_email,
"authenticated_accounts": list(oauth_manager.list_accounts().keys())
}, indent=2)
# Switch account
success = oauth_manager.switch_account(target_email)
if success:
result = {
"success": True,
"message": f"Successfully switched to account: {target_email}",
"current_account": oauth_manager.get_current_account(),
"previous_account": None # Could track this if needed
}
else:
result = {
"success": False,
"error": "Failed to switch account",
"message": f"Could not switch to account: {target_email}",
"current_account": oauth_manager.get_current_account()
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error switching account: %s", e)
error_result = {
"success": False,
"error": str(e),
"message": f"Failed to switch to account: {target_email}"
}
return json.dumps(error_result, indent=2)
def list_accounts() -> str:
"""
List all authenticated Gmail accounts and their status.
Returns:
str: JSON string containing all accounts and their authentication status
"""
try:
logger.info("Listing all accounts")
accounts = oauth_manager.list_accounts()
current_account = oauth_manager.get_current_account()
result = {
"accounts": accounts,
"current_account": current_account,
"total_accounts": len(accounts),
"authenticated_accounts": [email for email, is_auth in accounts.items() if is_auth],
"message": f"Found {len(accounts)} stored accounts, currently using: {current_account or 'None'}"
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error listing accounts: %s", e)
error_result = {
"error": str(e),
"message": "Failed to list accounts"
}
return json.dumps(error_result, indent=2)
def remove_account(email_to_remove: str) -> str:
"""
Remove an authenticated Gmail account and its stored credentials.
Args:
email_to_remove (str): Email address to remove
Returns:
str: JSON string containing removal result
"""
try:
logger.info("Removing account: %s", email_to_remove)
# Check if account exists
accounts = oauth_manager.list_accounts()
if email_to_remove not in accounts:
return json.dumps({
"error": "Account not found",
"message": f"Account '{email_to_remove}' not found in stored accounts.",
"available_accounts": list(accounts.keys())
}, indent=2)
# Remove account
oauth_manager.remove_account(email_to_remove)
result = {
"success": True,
"message": f"Successfully removed account: {email_to_remove}",
"removed_account": email_to_remove,
"current_account": oauth_manager.get_current_account(),
"remaining_accounts": list(oauth_manager.list_accounts().keys())
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error removing account: %s", e)
error_result = {
"success": False,
"error": str(e),
"message": f"Failed to remove account: {email_to_remove}"
}
return json.dumps(error_result, indent=2)
def search_emails(sender_keyword: str, start_date: str = "", end_date: str = "") -> str:
"""
Search for emails from a specific sender within a date range using OAuth authentication.
Args:
sender_keyword (str): The sender/company keyword to search for (e.g., "apple", "amazon")
start_date (str): Start date in DD-MMM-YYYY format (e.g., "01-Jan-2025"). If empty, defaults to 7 days ago.
end_date (str): End date in DD-MMM-YYYY format (e.g., "07-Jan-2025"). If empty, defaults to today.
Returns:
str: JSON string containing email search results and analysis
"""
try:
logger.info("OAuth Email search tool called with sender: %s, dates: %s to %s", sender_keyword, start_date, end_date)
# Check authentication
is_auth, auth_info = check_authentication()
if not is_auth:
return json.dumps({
"error": "Not authenticated",
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'",
"auth_status": auth_info
}, indent=2)
# Set default date range if not provided
if not start_date or not end_date:
today = datetime.today()
if not end_date:
end_date = today.strftime("%d-%b-%Y")
if not start_date:
start_date = (today - timedelta(days=7)).strftime("%d-%b-%Y")
logger.info(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}")
# Use Gmail API scraper with OAuth
full_emails = gmail_scraper.search_emails(sender_keyword, start_date, end_date)
if not full_emails:
result = {
"sender_keyword": sender_keyword,
"date_range": f"{start_date} to {end_date}",
"email_summary": [],
"analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []},
"email_count": 0,
"user_email": auth_info
}
return json.dumps(result, indent=2)
# Create summary version without full content
email_summary = []
for email in full_emails:
summary_email = {
"date": email.get("date"),
"time": email.get("time"),
"subject": email.get("subject"),
"from": email.get("from", "Unknown Sender"),
"message_id": email.get("message_id"),
"gmail_id": email.get("gmail_id")
}
email_summary.append(summary_email)
# Auto-analyze the emails for insights (no OpenAI)
analysis = simple_analyze_emails(full_emails)
# Return summary info with analysis
result = {
"sender_keyword": sender_keyword,
"date_range": f"{start_date} to {end_date}",
"email_summary": email_summary,
"analysis": analysis,
"email_count": len(full_emails),
"user_email": auth_info
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error in search_emails: %s", e)
error_result = {
"error": str(e),
"sender_keyword": sender_keyword,
"message": "Failed to search emails."
}
return json.dumps(error_result, indent=2)
def get_email_details(message_id: str) -> str:
"""
Get full details of a specific email by its message ID using OAuth authentication.
Args:
message_id (str): The message ID of the email to retrieve
Returns:
str: JSON string containing the full email details
"""
try:
logger.info("Getting email details for message_id: %s", message_id)
# Check authentication
is_auth, auth_info = check_authentication()
if not is_auth:
return json.dumps({
"error": "Not authenticated",
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'",
"auth_status": auth_info
}, indent=2)
# Get email using Gmail API
email = gmail_scraper.get_email_by_id(message_id)
if email:
email["user_email"] = auth_info
return json.dumps(email, indent=2)
else:
error_result = {
"error": f"No email found with message_id '{message_id}'",
"message": "Email may not exist or you may not have access to it.",
"user_email": auth_info
}
return json.dumps(error_result, indent=2)
except Exception as e:
logger.error("Error in get_email_details: %s", e)
error_result = {
"error": str(e),
"message_id": message_id,
"message": "Failed to retrieve email details."
}
return json.dumps(error_result, indent=2)
def analyze_email_patterns(sender_keyword: str, days_back: str = "30") -> str:
"""
Analyze email patterns from a specific sender over a given time period using OAuth authentication.
Args:
sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google")
days_back (str): Number of days to look back (default: "30")
Returns:
str: JSON string containing email pattern analysis
"""
try:
logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back)
# Check authentication
is_auth, auth_info = check_authentication()
if not is_auth:
return json.dumps({
"error": "Not authenticated",
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'",
"auth_status": auth_info
}, indent=2)
# Calculate date range
days_int = int(days_back)
end_date = datetime.today()
start_date = end_date - timedelta(days=days_int)
start_date_str = start_date.strftime("%d-%b-%Y")
end_date_str = end_date.strftime("%d-%b-%Y")
# Search for emails using Gmail API
full_emails = gmail_scraper.search_emails(sender_keyword, start_date_str, end_date_str)
if not full_emails:
result = {
"sender_keyword": sender_keyword,
"date_range": f"{start_date_str} to {end_date_str}",
"analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []},
"email_count": 0,
"user_email": auth_info
}
return json.dumps(result, indent=2)
# Analyze the emails (no OpenAI)
analysis = simple_analyze_emails(full_emails)
result = {
"sender_keyword": sender_keyword,
"date_range": f"{start_date_str} to {end_date_str}",
"analysis": analysis,
"email_count": len(full_emails),
"user_email": auth_info
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error in analyze_email_patterns: %s", e)
error_result = {
"error": str(e),
"sender_keyword": sender_keyword,
"message": "Failed to analyze email patterns."
}
return json.dumps(error_result, indent=2)
def get_authentication_status() -> str:
"""
Get current authentication status and account information.
Returns:
str: JSON string containing authentication status
"""
try:
current_account = oauth_manager.get_current_account()
is_auth = oauth_manager.is_authenticated() if current_account else False
all_accounts = oauth_manager.list_accounts()
result = {
"authenticated": is_auth,
"current_account": current_account,
"status": "authenticated" if is_auth else "not_authenticated",
"message": f"Current account: {current_account}" if is_auth else "No account selected or not authenticated",
"all_accounts": all_accounts,
"total_accounts": len(all_accounts),
"authenticated_accounts": [email for email, auth in all_accounts.items() if auth]
}
if not is_auth and not oauth_manager.client_secrets_file.exists():
result["setup_required"] = True
result["message"] = "OAuth not configured. Please run 'python setup_oauth.py' first."
elif not is_auth and current_account:
result["message"] = f"Account {current_account} needs re-authentication"
elif not current_account and all_accounts:
result["message"] = "Accounts available but none selected. Use switch_account to select one."
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error checking authentication status: %s", e)
return json.dumps({
"error": str(e),
"message": "Failed to check authentication status"
}, indent=2)
# Create Gradio interfaces
search_interface = gr.Interface(
fn=search_emails,
inputs=[
gr.Textbox(label="Sender Keyword", placeholder="apple, amazon, etc."),
gr.Textbox(label="Start Date (Optional)", placeholder="01-Jan-2025 (leave empty for last 7 days)"),
gr.Textbox(label="End Date (Optional)", placeholder="07-Jan-2025 (leave empty for today)")
],
outputs=gr.Textbox(label="Search Results", lines=20),
title="Email Search (OAuth)",
description="Search your emails by sender keyword and date range with OAuth authentication"
)
details_interface = gr.Interface(
fn=get_email_details,
inputs=[
gr.Textbox(label="Message ID", placeholder="Email message ID from search results")
],
outputs=gr.Textbox(label="Email Details", lines=20),
title="Email Details (OAuth)",
description="Get full details of a specific email by message ID with OAuth authentication"
)
analysis_interface = gr.Interface(
fn=analyze_email_patterns,
inputs=[
gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."),
gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze")
],
outputs=gr.Textbox(label="Analysis Results", lines=20),
title="Email Pattern Analysis (OAuth)",
description="Analyze email patterns from a specific sender over time with OAuth authentication"
)
auth_interface = gr.Interface(
fn=authenticate_user,
inputs=[],
outputs=gr.Textbox(label="Authentication Result", lines=10),
title="Authenticate with Gmail",
description="Click Submit to start OAuth authentication flow with Gmail"
)
status_interface = gr.Interface(
fn=get_authentication_status,
inputs=[],
outputs=gr.Textbox(label="Authentication Status", lines=15),
title="Authentication Status",
description="Check current authentication status and view all accounts"
)
switch_interface = gr.Interface(
fn=switch_account,
inputs=[
gr.Textbox(label="Target Email", placeholder="[email protected]")
],
outputs=gr.Textbox(label="Switch Result", lines=10),
title="Switch Account",
description="Switch to a different authenticated Gmail account"
)
accounts_interface = gr.Interface(
fn=list_accounts,
inputs=[],
outputs=gr.Textbox(label="Accounts List", lines=15),
title="List All Accounts",
description="View all authenticated Gmail accounts and their status"
)
remove_interface = gr.Interface(
fn=remove_account,
inputs=[
gr.Textbox(label="Email to Remove", placeholder="[email protected]")
],
outputs=gr.Textbox(label="Removal Result", lines=10),
title="Remove Account",
description="Remove an authenticated Gmail account and its credentials"
)
# Combine interfaces into a tabbed interface
demo = gr.TabbedInterface(
[auth_interface, status_interface, accounts_interface, switch_interface, remove_interface, search_interface, details_interface, analysis_interface],
["πŸ” Authenticate", "πŸ“Š Status", "πŸ‘₯ All Accounts", "πŸ”„ Switch Account", "πŸ—‘οΈ Remove Account", "πŸ“§ Email Search", "πŸ“„ Email Details", "πŸ“ˆ Pattern Analysis"],
title="πŸ“§ Gmail Assistant MCP Server (Multi-Account OAuth)"
)
if __name__ == "__main__":
# Set environment variable to enable MCP server
import os
os.environ["GRADIO_MCP_SERVER"] = "True"
# Check authentication status on startup
current_account = oauth_manager.get_current_account()
all_accounts = oauth_manager.list_accounts()
if current_account and oauth_manager.is_authenticated():
print(f"βœ… Currently authenticated as: {current_account}")
if len(all_accounts) > 1:
print(f"πŸ“± {len(all_accounts)} total accounts available: {list(all_accounts.keys())}")
elif all_accounts:
print(f"πŸ“± {len(all_accounts)} stored accounts found: {list(all_accounts.keys())}")
print("⚠️ No current account selected. Use the web interface or Claude to switch accounts.")
else:
print("❌ No authenticated accounts. Users will need to authenticate through the web interface.")
print("πŸ’‘ Or run 'python setup_oauth.py' for initial setup.")
# Launch the server
demo.launch(share=False)
print("\nπŸš€ MCP Server is running!")
print("πŸ“ MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse")
print("πŸ“– Copy this URL to your Claude Desktop MCP configuration")
print("\nπŸ”— Web Interface: http://localhost:7860")
print("πŸ“ Use the web interface to authenticate and test the tools")