File size: 9,277 Bytes
2312d97 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
import gradio as gr
import json
import os
from typing import Dict, List
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Import your existing modules
from tools import extract_query_info, analyze_emails
from email_scraper import scrape_emails_by_text_search_with_credentials, _load_email_db
from logger import logger
load_dotenv()
def search_emails(email_address: str, app_password: str, query: str) -> str:
"""
Search for emails based on a natural language query and return a summary.
Args:
email_address (str): The Gmail address to connect to
app_password (str): The Gmail app password for authentication
query (str): Natural language query (e.g., "show me mails from swiggy last week")
Returns:
str: JSON string containing email search results and analysis
"""
try:
logger.info("Email MCP tool called with query: %s", query)
# Extract sender keyword and date range from query
query_info = extract_query_info(query)
sender_keyword = query_info.get("sender_keyword", "")
start_date = query_info.get("start_date")
end_date = query_info.get("end_date")
print(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}")
# Use the modified scraper function that accepts credentials
full_emails = scrape_emails_by_text_search_with_credentials(
email_address, app_password, sender_keyword, start_date, end_date
)
if not full_emails:
result = {
"query_info": query_info,
"email_summary": [],
"analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []},
"email_count": 0
}
return json.dumps(result, indent=2)
# Create summary version without full content
email_summary = []
for email in full_emails:
summary_email = {
"date": email.get("date"),
"time": email.get("time"),
"subject": email.get("subject"),
"from": email.get("from", "Unknown Sender"),
"message_id": email.get("message_id")
}
email_summary.append(summary_email)
# Auto-analyze the emails for insights
analysis = analyze_emails(full_emails)
# Return summary info with analysis
result = {
"query_info": query_info,
"email_summary": email_summary,
"analysis": analysis,
"email_count": len(full_emails)
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error in search_emails: %s", e)
error_result = {
"error": str(e),
"query": query,
"message": "Failed to search emails. Please check your credentials and try again."
}
return json.dumps(error_result, indent=2)
def get_email_details(email_address: str, app_password: str, message_id: str) -> str:
"""
Get full details of a specific email by its message ID.
Args:
email_address (str): The Gmail address to connect to
app_password (str): The Gmail app password for authentication
message_id (str): The message ID of the email to retrieve
Returns:
str: JSON string containing the full email details
"""
try:
logger.info("Getting email details for message_id: %s", message_id)
# Load from local cache first
db = _load_email_db()
# Search each sender's email list
for sender_data in db.values():
for email in sender_data.get("emails", []):
if email.get("message_id") == message_id:
return json.dumps(email, indent=2)
# If not found in cache
error_result = {
"error": f"No email found with message_id '{message_id}'",
"message": "Email may not be in local cache. Try searching for emails first."
}
return json.dumps(error_result, indent=2)
except Exception as e:
logger.error("Error in get_email_details: %s", e)
error_result = {
"error": str(e),
"message_id": message_id,
"message": "Failed to retrieve email details."
}
return json.dumps(error_result, indent=2)
def analyze_email_patterns(email_address: str, app_password: str, sender_keyword: str, days_back: str = "30") -> str:
"""
Analyze email patterns from a specific sender over a given time period.
Args:
email_address (str): The Gmail address to connect to
app_password (str): The Gmail app password for authentication
sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google")
days_back (str): Number of days to look back (default: "30")
Returns:
str: JSON string containing email pattern analysis
"""
try:
logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back)
# Calculate date range
days_int = int(days_back)
end_date = datetime.today()
start_date = end_date - timedelta(days=days_int)
start_date_str = start_date.strftime("%d-%b-%Y")
end_date_str = end_date.strftime("%d-%b-%Y")
# Search for emails
full_emails = scrape_emails_by_text_search_with_credentials(
email_address, app_password, sender_keyword, start_date_str, end_date_str
)
if not full_emails:
result = {
"sender_keyword": sender_keyword,
"date_range": f"{start_date_str} to {end_date_str}",
"analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []},
"email_count": 0
}
return json.dumps(result, indent=2)
# Analyze the emails
analysis = analyze_emails(full_emails)
result = {
"sender_keyword": sender_keyword,
"date_range": f"{start_date_str} to {end_date_str}",
"analysis": analysis,
"email_count": len(full_emails)
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error("Error in analyze_email_patterns: %s", e)
error_result = {
"error": str(e),
"sender_keyword": sender_keyword,
"message": "Failed to analyze email patterns."
}
return json.dumps(error_result, indent=2)
# Create the Gradio interface for email search
search_interface = gr.Interface(
fn=search_emails,
inputs=[
gr.Textbox(label="Email Address", placeholder="[email protected]"),
gr.Textbox(label="App Password", type="password", placeholder="Your Gmail app password"),
gr.Textbox(label="Query", placeholder="Show me emails from amazon last week")
],
outputs=gr.Textbox(label="Search Results", lines=20),
title="Email Search",
description="Search your emails using natural language queries"
)
# Create the Gradio interface for email details
details_interface = gr.Interface(
fn=get_email_details,
inputs=[
gr.Textbox(label="Email Address", placeholder="[email protected]"),
gr.Textbox(label="App Password", type="password", placeholder="Your Gmail app password"),
gr.Textbox(label="Message ID", placeholder="Email message ID from search results")
],
outputs=gr.Textbox(label="Email Details", lines=20),
title="Email Details",
description="Get full details of a specific email by message ID"
)
# Create the Gradio interface for email pattern analysis
analysis_interface = gr.Interface(
fn=analyze_email_patterns,
inputs=[
gr.Textbox(label="Email Address", placeholder="[email protected]"),
gr.Textbox(label="App Password", type="password", placeholder="Your Gmail app password"),
gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."),
gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze")
],
outputs=gr.Textbox(label="Analysis Results", lines=20),
title="Email Pattern Analysis",
description="Analyze email patterns from a specific sender over time"
)
# Combine interfaces into a tabbed interface
demo = gr.TabbedInterface(
[search_interface, details_interface, analysis_interface],
["Email Search", "Email Details", "Pattern Analysis"],
title="π§ Email Assistant MCP Server"
)
if __name__ == "__main__":
# Set environment variable to enable MCP server
import os
os.environ["GRADIO_MCP_SERVER"] = "True"
# Launch the server
demo.launch(share=False)
print("\nπ MCP Server is running!")
print("π MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse")
print("π Copy this URL to your Claude Desktop MCP configuration") |