|
|
|
""" |
|
Gmail MCP Server with OAuth Authentication and Multi-Account Support |
|
""" |
|
|
|
import gradio as gr |
|
import json |
|
import base64 |
|
from email.mime.text import MIMEText |
|
from googleapiclient.errors import HttpError |
|
import os |
|
from typing import Dict, List |
|
from datetime import datetime, timedelta |
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
from gmail_api_scraper import GmailAPIScraper |
|
from oauth_manager import oauth_manager |
|
from logger import logger |
|
|
|
load_dotenv() |
|
|
|
if not oauth_manager.client_secrets_file.exists(): |
|
oauth_manager.setup_client_secrets( |
|
os.environ["GOOGLE_CLIENT_ID"], |
|
os.environ["GOOGLE_CLIENT_SECRET"] |
|
) |
|
|
|
|
|
gmail_scraper = GmailAPIScraper() |
|
|
|
def check_authentication() -> tuple[bool, str]: |
|
"""Check if user is authenticated and return status""" |
|
current_account = oauth_manager.get_current_account() |
|
if current_account and oauth_manager.is_authenticated(): |
|
return True, current_account |
|
else: |
|
return False, "Not authenticated" |
|
|
|
def simple_analyze_emails(emails) -> dict: |
|
""" |
|
Simple email analysis without OpenAI - just basic statistics and patterns |
|
""" |
|
if not emails: |
|
return {"summary": "No emails to analyze.", "insights": []} |
|
|
|
|
|
total_count = len(emails) |
|
|
|
|
|
senders = {} |
|
subjects = [] |
|
dates = [] |
|
|
|
for email in emails: |
|
sender = email.get("from", "Unknown") |
|
|
|
if "<" in sender and ">" in sender: |
|
email_part = sender.split("<")[1].split(">")[0] |
|
else: |
|
email_part = sender |
|
|
|
domain = email_part.split("@")[-1] if "@" in email_part else sender |
|
|
|
senders[domain] = senders.get(domain, 0) + 1 |
|
subjects.append(email.get("subject", "")) |
|
dates.append(email.get("date", "")) |
|
|
|
|
|
insights = [] |
|
insights.append(f"Found {total_count} emails total") |
|
|
|
if senders: |
|
top_sender = max(senders.items(), key=lambda x: x[1]) |
|
insights.append(f"Most emails from: {top_sender[0]} ({top_sender[1]} emails)") |
|
|
|
if len(senders) > 1: |
|
insights.append(f"Emails from {len(senders)} different domains") |
|
|
|
|
|
if dates: |
|
unique_dates = list(set(dates)) |
|
if len(unique_dates) > 1: |
|
insights.append(f"Spanning {len(unique_dates)} different days") |
|
|
|
|
|
if subjects: |
|
|
|
all_words = [] |
|
for subject in subjects: |
|
words = subject.lower().split() |
|
all_words.extend([w for w in words if len(w) > 3]) |
|
|
|
if all_words: |
|
word_counts = {} |
|
for word in all_words: |
|
word_counts[word] = word_counts.get(word, 0) + 1 |
|
|
|
if word_counts: |
|
common_word = max(word_counts.items(), key=lambda x: x[1]) |
|
if common_word[1] > 1: |
|
insights.append(f"Common subject word: '{common_word[0]}' appears {common_word[1]} times") |
|
|
|
summary = f"Analysis of {total_count} emails from {len(senders)} sender(s)" |
|
|
|
return { |
|
"summary": summary, |
|
"insights": insights |
|
} |
|
|
|
def authenticate_user() -> str: |
|
""" |
|
Start OAuth authentication flow for Gmail access. |
|
Opens a browser window for user to authenticate with Google. |
|
|
|
Returns: |
|
str: JSON string containing authentication result |
|
""" |
|
try: |
|
logger.info("Starting OAuth authentication flow...") |
|
|
|
if oauth_manager.is_authenticated(): |
|
user_email = oauth_manager.get_current_account() |
|
return json.dumps({ |
|
"success": True, |
|
"message": "Already authenticated!", |
|
"user_email": user_email, |
|
"instructions": [ |
|
"You are already authenticated and ready to use email tools", |
|
f"Currently authenticated as: {user_email}" |
|
] |
|
}, indent=2) |
|
|
|
if not oauth_manager.client_secrets_file.exists(): |
|
return json.dumps({ |
|
"error": "OAuth not configured", |
|
"message": "Please run 'python setup_oauth.py' first to configure OAuth credentials.", |
|
"success": False |
|
}, indent=2) |
|
|
|
|
|
success = oauth_manager.authenticate_interactive() |
|
|
|
if success: |
|
user_email = oauth_manager.get_current_account() |
|
result = { |
|
"success": True, |
|
"message": "Authentication successful! You can now use the email tools.", |
|
"user_email": user_email, |
|
"instructions": [ |
|
"Authentication completed successfully", |
|
"You can now search emails, get email details, and analyze patterns", |
|
f"Currently authenticated as: {user_email}" |
|
] |
|
} |
|
else: |
|
|
|
auth_url = oauth_manager.get_pending_auth_url() |
|
callback_url = oauth_manager.get_hf_redirect_uri() |
|
|
|
if auth_url: |
|
result = { |
|
"success": False, |
|
"message": "Manual authentication required", |
|
"auth_url": auth_url, |
|
"callback_url": callback_url, |
|
"instructions": [ |
|
"Authentication URL has been generated", |
|
"Please click the link below to authenticate:", |
|
"1. Click the authentication URL", |
|
"2. Sign in with your Google account", |
|
"3. Grant Gmail access permissions", |
|
"4. You'll be redirected back automatically", |
|
"5. Try clicking 'Submit' again after completing authentication" |
|
], |
|
"note": "After completing authentication in the popup, click Submit again to verify" |
|
} |
|
else: |
|
result = { |
|
"success": False, |
|
"error": "Failed to generate authentication URL", |
|
"message": "Could not start authentication process. Check your OAuth configuration." |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error in authenticate_user: %s", e) |
|
error_result = { |
|
"success": False, |
|
"error": str(e), |
|
"message": "Authentication failed due to an error." |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
|
|
def handle_oauth_callback(auth_code: str) -> str: |
|
"""Handle OAuth callback for Hugging Face Spaces |
|
|
|
Args: |
|
auth_code: Authorization code from OAuth callback |
|
|
|
Returns: |
|
HTML response string |
|
""" |
|
try: |
|
if not auth_code: |
|
return """ |
|
<html> |
|
<head><title>OAuth Error</title></head> |
|
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> |
|
<h1 style="color: #d32f2f;">Authentication Error</h1> |
|
<p>No authorization code received.</p> |
|
<button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #1976d2; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> |
|
</body> |
|
</html> |
|
""" |
|
|
|
success = oauth_manager.complete_hf_spaces_auth(auth_code) |
|
|
|
if success: |
|
user_email = oauth_manager.get_current_account() |
|
return f""" |
|
<html> |
|
<head><title>OAuth Success</title></head> |
|
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> |
|
<h1 style="color: #2e7d32;">π Authentication Successful!</h1> |
|
<p>You are now authenticated as:</p> |
|
<p style="font-weight: bold; font-size: 18px; color: #1976d2;">{user_email}</p> |
|
<p>You can now close this window and return to the main application.</p> |
|
<p style="color: #666; font-size: 14px;">This window will close automatically in 5 seconds...</p> |
|
<button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #2e7d32; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> |
|
<script> |
|
setTimeout(function() {{ |
|
window.close(); |
|
}}, 5000); |
|
</script> |
|
</body> |
|
</html> |
|
""" |
|
else: |
|
return """ |
|
<html> |
|
<head><title>OAuth Error</title></head> |
|
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> |
|
<h1 style="color: #d32f2f;">Authentication Failed</h1> |
|
<p>Unable to complete authentication. Please try again.</p> |
|
<p>Make sure you granted all required permissions.</p> |
|
<button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #d32f2f; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> |
|
</body> |
|
</html> |
|
""" |
|
|
|
except Exception as e: |
|
logger.error(f"Error handling OAuth callback: {e}") |
|
return f""" |
|
<html> |
|
<head><title>OAuth Error</title></head> |
|
<body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> |
|
<h1 style="color: #d32f2f;">Authentication Error</h1> |
|
<p>An error occurred during authentication:</p> |
|
<pre style="background: #f5f5f5; padding: 10px; border-radius: 4px; text-align: left; max-width: 500px; margin: 0 auto;">{str(e)}</pre> |
|
<button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #d32f2f; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> |
|
</body> |
|
</html> |
|
""" |
|
|
|
|
|
def switch_account(target_email: str) -> str: |
|
""" |
|
Switch to a different authenticated Gmail account. |
|
|
|
Args: |
|
target_email (str): Email address to switch to |
|
|
|
Returns: |
|
str: JSON string containing switch result |
|
""" |
|
try: |
|
logger.info("Switching to account: %s", target_email) |
|
|
|
|
|
if not oauth_manager.is_authenticated(target_email): |
|
return json.dumps({ |
|
"error": "Account not authenticated", |
|
"message": f"Account '{target_email}' is not authenticated. Please authenticate first.", |
|
"target_email": target_email, |
|
"authenticated_accounts": list(oauth_manager.list_accounts().keys()) |
|
}, indent=2) |
|
|
|
|
|
success = oauth_manager.switch_account(target_email) |
|
|
|
if success: |
|
result = { |
|
"success": True, |
|
"message": f"Successfully switched to account: {target_email}", |
|
"current_account": oauth_manager.get_current_account(), |
|
"previous_account": None |
|
} |
|
else: |
|
result = { |
|
"success": False, |
|
"error": "Failed to switch account", |
|
"message": f"Could not switch to account: {target_email}", |
|
"current_account": oauth_manager.get_current_account() |
|
} |
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error switching account: %s", e) |
|
error_result = { |
|
"success": False, |
|
"error": str(e), |
|
"message": f"Failed to switch to account: {target_email}" |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
def list_accounts() -> str: |
|
""" |
|
List all authenticated Gmail accounts and their status. |
|
|
|
Returns: |
|
str: JSON string containing all accounts and their authentication status |
|
""" |
|
try: |
|
logger.info("Listing all accounts") |
|
|
|
accounts = oauth_manager.list_accounts() |
|
current_account = oauth_manager.get_current_account() |
|
|
|
result = { |
|
"accounts": accounts, |
|
"current_account": current_account, |
|
"total_accounts": len(accounts), |
|
"authenticated_accounts": [email for email, is_auth in accounts.items() if is_auth], |
|
"message": f"Found {len(accounts)} stored accounts, currently using: {current_account or 'None'}" |
|
} |
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error listing accounts: %s", e) |
|
error_result = { |
|
"error": str(e), |
|
"message": "Failed to list accounts" |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
def remove_account(email_to_remove: str) -> str: |
|
""" |
|
Remove an authenticated Gmail account and its stored credentials. |
|
|
|
Args: |
|
email_to_remove (str): Email address to remove |
|
|
|
Returns: |
|
str: JSON string containing removal result |
|
""" |
|
try: |
|
logger.info("Removing account: %s", email_to_remove) |
|
|
|
|
|
accounts = oauth_manager.list_accounts() |
|
if email_to_remove not in accounts: |
|
return json.dumps({ |
|
"error": "Account not found", |
|
"message": f"Account '{email_to_remove}' not found in stored accounts.", |
|
"available_accounts": list(accounts.keys()) |
|
}, indent=2) |
|
|
|
|
|
oauth_manager.remove_account(email_to_remove) |
|
|
|
result = { |
|
"success": True, |
|
"message": f"Successfully removed account: {email_to_remove}", |
|
"removed_account": email_to_remove, |
|
"current_account": oauth_manager.get_current_account(), |
|
"remaining_accounts": list(oauth_manager.list_accounts().keys()) |
|
} |
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error removing account: %s", e) |
|
error_result = { |
|
"success": False, |
|
"error": str(e), |
|
"message": f"Failed to remove account: {email_to_remove}" |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
def search_emails(sender_keyword: str, start_date: str = "", end_date: str = "") -> str: |
|
""" |
|
Search for emails from a specific sender within a date range using OAuth authentication. |
|
|
|
Args: |
|
sender_keyword (str): The sender/company keyword to search for (e.g., "apple", "amazon") |
|
start_date (str): Start date in DD-MMM-YYYY format (e.g., "01-Jan-2025"). If empty, defaults to 7 days ago. |
|
end_date (str): End date in DD-MMM-YYYY format (e.g., "07-Jan-2025"). If empty, defaults to today. |
|
|
|
Returns: |
|
str: JSON string containing email search results and analysis |
|
""" |
|
try: |
|
logger.info("OAuth Email search tool called with sender: %s, dates: %s to %s", sender_keyword, start_date, end_date) |
|
|
|
|
|
is_auth, auth_info = check_authentication() |
|
if not is_auth: |
|
return json.dumps({ |
|
"error": "Not authenticated", |
|
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", |
|
"auth_status": auth_info |
|
}, indent=2) |
|
|
|
|
|
if not start_date or not end_date: |
|
today = datetime.today() |
|
if not end_date: |
|
end_date = today.strftime("%d-%b-%Y") |
|
if not start_date: |
|
start_date = (today - timedelta(days=7)).strftime("%d-%b-%Y") |
|
|
|
logger.info(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}") |
|
|
|
|
|
full_emails = gmail_scraper.search_emails(sender_keyword, start_date, end_date) |
|
|
|
if not full_emails: |
|
result = { |
|
"sender_keyword": sender_keyword, |
|
"date_range": f"{start_date} to {end_date}", |
|
"email_summary": [], |
|
"analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []}, |
|
"email_count": 0, |
|
"user_email": auth_info |
|
} |
|
return json.dumps(result, indent=2) |
|
|
|
|
|
email_summary = [] |
|
for email in full_emails: |
|
summary_email = { |
|
"date": email.get("date"), |
|
"time": email.get("time"), |
|
"subject": email.get("subject"), |
|
"from": email.get("from", "Unknown Sender"), |
|
"message_id": email.get("message_id"), |
|
"gmail_id": email.get("gmail_id") |
|
} |
|
email_summary.append(summary_email) |
|
|
|
|
|
analysis = simple_analyze_emails(full_emails) |
|
|
|
|
|
result = { |
|
"sender_keyword": sender_keyword, |
|
"date_range": f"{start_date} to {end_date}", |
|
"email_summary": email_summary, |
|
"analysis": analysis, |
|
"email_count": len(full_emails), |
|
"user_email": auth_info |
|
} |
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error in search_emails: %s", e) |
|
error_result = { |
|
"error": str(e), |
|
"sender_keyword": sender_keyword, |
|
"message": "Failed to search emails." |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
def get_email_details(message_id: str) -> str: |
|
""" |
|
Get full details of a specific email by its message ID using OAuth authentication. |
|
|
|
Args: |
|
message_id (str): The message ID of the email to retrieve |
|
|
|
Returns: |
|
str: JSON string containing the full email details |
|
""" |
|
try: |
|
logger.info("Getting email details for message_id: %s", message_id) |
|
|
|
|
|
is_auth, auth_info = check_authentication() |
|
if not is_auth: |
|
return json.dumps({ |
|
"error": "Not authenticated", |
|
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", |
|
"auth_status": auth_info |
|
}, indent=2) |
|
|
|
|
|
email = gmail_scraper.get_email_by_id(message_id) |
|
|
|
if email: |
|
email["user_email"] = auth_info |
|
return json.dumps(email, indent=2) |
|
else: |
|
error_result = { |
|
"error": f"No email found with message_id '{message_id}'", |
|
"message": "Email may not exist or you may not have access to it.", |
|
"user_email": auth_info |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error in get_email_details: %s", e) |
|
error_result = { |
|
"error": str(e), |
|
"message_id": message_id, |
|
"message": "Failed to retrieve email details." |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
def analyze_email_patterns(sender_keyword: str, days_back: str = "30") -> str: |
|
""" |
|
Analyze email patterns from a specific sender over a given time period using OAuth authentication. |
|
|
|
Args: |
|
sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google") |
|
days_back (str): Number of days to look back (default: "30") |
|
|
|
Returns: |
|
str: JSON string containing email pattern analysis |
|
""" |
|
try: |
|
logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back) |
|
|
|
|
|
is_auth, auth_info = check_authentication() |
|
if not is_auth: |
|
return json.dumps({ |
|
"error": "Not authenticated", |
|
"message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", |
|
"auth_status": auth_info |
|
}, indent=2) |
|
|
|
|
|
days_int = int(days_back) |
|
end_date = datetime.today() |
|
start_date = end_date - timedelta(days=days_int) |
|
|
|
start_date_str = start_date.strftime("%d-%b-%Y") |
|
end_date_str = end_date.strftime("%d-%b-%Y") |
|
|
|
|
|
full_emails = gmail_scraper.search_emails(sender_keyword, start_date_str, end_date_str) |
|
|
|
if not full_emails: |
|
result = { |
|
"sender_keyword": sender_keyword, |
|
"date_range": f"{start_date_str} to {end_date_str}", |
|
"analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []}, |
|
"email_count": 0, |
|
"user_email": auth_info |
|
} |
|
return json.dumps(result, indent=2) |
|
|
|
|
|
analysis = simple_analyze_emails(full_emails) |
|
|
|
result = { |
|
"sender_keyword": sender_keyword, |
|
"date_range": f"{start_date_str} to {end_date_str}", |
|
"analysis": analysis, |
|
"email_count": len(full_emails), |
|
"user_email": auth_info |
|
} |
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error in analyze_email_patterns: %s", e) |
|
error_result = { |
|
"error": str(e), |
|
"sender_keyword": sender_keyword, |
|
"message": "Failed to analyze email patterns." |
|
} |
|
return json.dumps(error_result, indent=2) |
|
|
|
def get_authentication_status() -> str: |
|
""" |
|
Get current authentication status and account information. |
|
|
|
Returns: |
|
str: JSON string containing authentication status |
|
""" |
|
try: |
|
current_account = oauth_manager.get_current_account() |
|
is_auth = oauth_manager.is_authenticated() if current_account else False |
|
all_accounts = oauth_manager.list_accounts() |
|
|
|
result = { |
|
"authenticated": is_auth, |
|
"current_account": current_account, |
|
"status": "authenticated" if is_auth else "not_authenticated", |
|
"message": f"Current account: {current_account}" if is_auth else "No account selected or not authenticated", |
|
"all_accounts": all_accounts, |
|
"total_accounts": len(all_accounts), |
|
"authenticated_accounts": [email for email, auth in all_accounts.items() if auth] |
|
} |
|
|
|
if not is_auth and not oauth_manager.client_secrets_file.exists(): |
|
result["setup_required"] = True |
|
result["message"] = "OAuth not configured. Please run 'python setup_oauth.py' first." |
|
elif not is_auth and current_account: |
|
result["message"] = f"Account {current_account} needs re-authentication" |
|
elif not current_account and all_accounts: |
|
result["message"] = "Accounts available but none selected. Use switch_account to select one." |
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
logger.error("Error checking authentication status: %s", e) |
|
return json.dumps({ |
|
"error": str(e), |
|
"message": "Failed to check authentication status" |
|
}, indent=2) |
|
|
|
def send_email(recipient: str, subject: str, body: str) -> str: |
|
""" |
|
Send a plain-text email via the authenticated Gmail account. |
|
Returns JSON with either: |
|
{"success": true, "message_id": "..."} |
|
or |
|
{"success": false, "error": "..."} |
|
""" |
|
|
|
service = oauth_manager.get_gmail_service() |
|
if service is None: |
|
return json.dumps( |
|
{"success": False, "error": "Not authenticated or failed to build service."}, |
|
indent=2, |
|
) |
|
|
|
|
|
mime_msg = MIMEText(body, "plain", "utf-8") |
|
mime_msg["to"] = recipient |
|
mime_msg["subject"] = subject |
|
|
|
|
|
raw_msg = base64.urlsafe_b64encode(mime_msg.as_bytes()).decode() |
|
try: |
|
sent = ( |
|
service.users() |
|
.messages() |
|
.send(userId="me", body={"raw": raw_msg}) |
|
.execute() |
|
) |
|
return json.dumps( |
|
{"success": True, "message_id": sent.get("id")}, indent=2 |
|
) |
|
except HttpError as err: |
|
logger.error(f"Error sending email: {err}") |
|
|
|
error_detail = getattr(err, "error_details", None) or str(err) |
|
return json.dumps( |
|
{"success": False, "error": error_detail}, |
|
indent=2, |
|
) |
|
|
|
|
|
|
|
search_interface = gr.Interface( |
|
fn=search_emails, |
|
inputs=[ |
|
gr.Textbox(label="Sender Keyword", placeholder="apple, amazon, etc."), |
|
gr.Textbox(label="Start Date (Optional)", placeholder="01-Jan-2025 (leave empty for last 7 days)"), |
|
gr.Textbox(label="End Date (Optional)", placeholder="07-Jan-2025 (leave empty for today)") |
|
], |
|
outputs=gr.Textbox(label="Search Results", lines=20), |
|
title="Email Search (OAuth)", |
|
description="Search your emails by sender keyword and date range with OAuth authentication" |
|
) |
|
|
|
details_interface = gr.Interface( |
|
fn=get_email_details, |
|
inputs=[ |
|
gr.Textbox(label="Message ID", placeholder="Email message ID from search results") |
|
], |
|
outputs=gr.Textbox(label="Email Details", lines=20), |
|
title="Email Details (OAuth)", |
|
description="Get full details of a specific email by message ID with OAuth authentication" |
|
) |
|
|
|
analysis_interface = gr.Interface( |
|
fn=analyze_email_patterns, |
|
inputs=[ |
|
gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."), |
|
gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze") |
|
], |
|
outputs=gr.Textbox(label="Analysis Results", lines=20), |
|
title="Email Pattern Analysis (OAuth)", |
|
description="Analyze email patterns from a specific sender over time with OAuth authentication" |
|
) |
|
|
|
auth_interface = gr.Interface( |
|
fn=authenticate_user, |
|
inputs=[], |
|
outputs=gr.Textbox(label="Authentication Result", lines=10), |
|
title="Authenticate with Gmail", |
|
description="Click Submit to start OAuth authentication flow with Gmail" |
|
) |
|
|
|
status_interface = gr.Interface( |
|
fn=get_authentication_status, |
|
inputs=[], |
|
outputs=gr.Textbox(label="Authentication Status", lines=15), |
|
title="Authentication Status", |
|
description="Check current authentication status and view all accounts" |
|
) |
|
|
|
switch_interface = gr.Interface( |
|
fn=switch_account, |
|
inputs=[ |
|
gr.Textbox(label="Target Email", placeholder="[email protected]") |
|
], |
|
outputs=gr.Textbox(label="Switch Result", lines=10), |
|
title="Switch Account", |
|
description="Switch to a different authenticated Gmail account" |
|
) |
|
|
|
accounts_interface = gr.Interface( |
|
fn=list_accounts, |
|
inputs=[], |
|
outputs=gr.Textbox(label="Accounts List", lines=15), |
|
title="List All Accounts", |
|
description="View all authenticated Gmail accounts and their status" |
|
) |
|
|
|
remove_interface = gr.Interface( |
|
fn=remove_account, |
|
inputs=[ |
|
gr.Textbox(label="Email to Remove", placeholder="[email protected]") |
|
], |
|
outputs=gr.Textbox(label="Removal Result", lines=10), |
|
title="Remove Account", |
|
description="Remove an authenticated Gmail account and its credentials" |
|
) |
|
|
|
send_interface = gr.Interface( |
|
fn=send_email, |
|
inputs=[ |
|
gr.Textbox(label="Recipient Email", placeholder="[email protected]"), |
|
gr.Textbox(label="Subject", placeholder="Email subject"), |
|
gr.Textbox(label="Body", placeholder="Email body text", lines=5) |
|
], |
|
outputs=gr.Textbox(label="Send Result", lines=10), |
|
title="βοΈ Send Email", |
|
description="Send an email via Gmail using OAuth authenticated account" |
|
) |
|
|
|
|
|
demo = gr.TabbedInterface( |
|
[auth_interface, status_interface, accounts_interface, switch_interface, remove_interface, search_interface, details_interface, analysis_interface, send_interface], |
|
["π Authenticate", "π Status", "π₯ All Accounts", "π Switch Account", "ποΈ Remove Account", "π§ Email Search", "π Email Details", "π Pattern Analysis", "βοΈ Send Email"], |
|
title="π§ Gmail Assistant MCP Server (Multi-Account OAuth)" |
|
) |
|
|
|
if __name__ == "__main__": |
|
|
|
import os |
|
os.environ["GRADIO_MCP_SERVER"] = "True" |
|
|
|
|
|
current_account = oauth_manager.get_current_account() |
|
all_accounts = oauth_manager.list_accounts() |
|
|
|
if current_account and oauth_manager.is_authenticated(): |
|
print(f"β
Currently authenticated as: {current_account}") |
|
if len(all_accounts) > 1: |
|
print(f"π± {len(all_accounts)} total accounts available: {list(all_accounts.keys())}") |
|
elif all_accounts: |
|
print(f"π± {len(all_accounts)} stored accounts found: {list(all_accounts.keys())}") |
|
print("β οΈ No current account selected. Use the web interface or Claude to switch accounts.") |
|
else: |
|
print("β No authenticated accounts. Users will need to authenticate through the web interface.") |
|
print("π‘ Or run 'python setup_oauth.py' for initial setup.") |
|
|
|
|
|
demo.launch(share=False) |
|
|
|
print("\nπ MCP Server is running!") |
|
print("π MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse") |
|
print("π Copy this URL to your Claude Desktop MCP configuration") |
|
print("\nπ Web Interface: http://localhost:7860") |
|
print("π Use the web interface to authenticate and test the tools") |