File size: 9,277 Bytes
2312d97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import gradio as gr
import json
import os
from typing import Dict, List
from datetime import datetime, timedelta
from dotenv import load_dotenv

# Import your existing modules
from tools import extract_query_info, analyze_emails
from email_scraper import scrape_emails_by_text_search_with_credentials, _load_email_db
from logger import logger

load_dotenv()

def search_emails(email_address: str, app_password: str, query: str) -> str:
    """
    Search for emails based on a natural language query and return a summary.
    
    Args:
        email_address (str): The Gmail address to connect to
        app_password (str): The Gmail app password for authentication  
        query (str): Natural language query (e.g., "show me mails from swiggy last week")
        
    Returns:
        str: JSON string containing email search results and analysis
    """
    try:
        logger.info("Email MCP tool called with query: %s", query)
        
        # Extract sender keyword and date range from query
        query_info = extract_query_info(query)
        sender_keyword = query_info.get("sender_keyword", "")
        start_date = query_info.get("start_date")
        end_date = query_info.get("end_date")
        
        print(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}")
        
        # Use the modified scraper function that accepts credentials
        full_emails = scrape_emails_by_text_search_with_credentials(
            email_address, app_password, sender_keyword, start_date, end_date
        )
        
        if not full_emails:
            result = {
                "query_info": query_info,
                "email_summary": [],
                "analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []},
                "email_count": 0
            }
            return json.dumps(result, indent=2)
        
        # Create summary version without full content
        email_summary = []
        for email in full_emails:
            summary_email = {
                "date": email.get("date"),
                "time": email.get("time"),
                "subject": email.get("subject"),
                "from": email.get("from", "Unknown Sender"),
                "message_id": email.get("message_id")
            }
            email_summary.append(summary_email)
        
        # Auto-analyze the emails for insights
        analysis = analyze_emails(full_emails)
        
        # Return summary info with analysis
        result = {
            "query_info": query_info,
            "email_summary": email_summary,
            "analysis": analysis,
            "email_count": len(full_emails)
        }
        
        return json.dumps(result, indent=2)
        
    except Exception as e:
        logger.error("Error in search_emails: %s", e)
        error_result = {
            "error": str(e),
            "query": query,
            "message": "Failed to search emails. Please check your credentials and try again."
        }
        return json.dumps(error_result, indent=2)


def get_email_details(email_address: str, app_password: str, message_id: str) -> str:
    """
    Get full details of a specific email by its message ID.
    
    Args:
        email_address (str): The Gmail address to connect to
        app_password (str): The Gmail app password for authentication
        message_id (str): The message ID of the email to retrieve
        
    Returns:
        str: JSON string containing the full email details
    """
    try:
        logger.info("Getting email details for message_id: %s", message_id)
        
        # Load from local cache first
        db = _load_email_db()
        
        # Search each sender's email list
        for sender_data in db.values():
            for email in sender_data.get("emails", []):
                if email.get("message_id") == message_id:
                    return json.dumps(email, indent=2)
        
        # If not found in cache
        error_result = {
            "error": f"No email found with message_id '{message_id}'",
            "message": "Email may not be in local cache. Try searching for emails first."
        }
        return json.dumps(error_result, indent=2)
        
    except Exception as e:
        logger.error("Error in get_email_details: %s", e)
        error_result = {
            "error": str(e),
            "message_id": message_id,
            "message": "Failed to retrieve email details."
        }
        return json.dumps(error_result, indent=2)


def analyze_email_patterns(email_address: str, app_password: str, sender_keyword: str, days_back: str = "30") -> str:
    """
    Analyze email patterns from a specific sender over a given time period.
    
    Args:
        email_address (str): The Gmail address to connect to
        app_password (str): The Gmail app password for authentication
        sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google")
        days_back (str): Number of days to look back (default: "30")
        
    Returns:
        str: JSON string containing email pattern analysis
    """
    try:
        logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back)
        
        # Calculate date range
        days_int = int(days_back)
        end_date = datetime.today()
        start_date = end_date - timedelta(days=days_int)
        
        start_date_str = start_date.strftime("%d-%b-%Y")
        end_date_str = end_date.strftime("%d-%b-%Y")
        
        # Search for emails
        full_emails = scrape_emails_by_text_search_with_credentials(
            email_address, app_password, sender_keyword, start_date_str, end_date_str
        )
        
        if not full_emails:
            result = {
                "sender_keyword": sender_keyword,
                "date_range": f"{start_date_str} to {end_date_str}",
                "analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []},
                "email_count": 0
            }
            return json.dumps(result, indent=2)
        
        # Analyze the emails
        analysis = analyze_emails(full_emails)
        
        result = {
            "sender_keyword": sender_keyword,
            "date_range": f"{start_date_str} to {end_date_str}",
            "analysis": analysis,
            "email_count": len(full_emails)
        }
        
        return json.dumps(result, indent=2)
        
    except Exception as e:
        logger.error("Error in analyze_email_patterns: %s", e)
        error_result = {
            "error": str(e),
            "sender_keyword": sender_keyword,
            "message": "Failed to analyze email patterns."
        }
        return json.dumps(error_result, indent=2)


# Create the Gradio interface for email search
search_interface = gr.Interface(
    fn=search_emails,
    inputs=[
        gr.Textbox(label="Email Address", placeholder="[email protected]"),
        gr.Textbox(label="App Password", type="password", placeholder="Your Gmail app password"),
        gr.Textbox(label="Query", placeholder="Show me emails from amazon last week")
    ],
    outputs=gr.Textbox(label="Search Results", lines=20),
    title="Email Search",
    description="Search your emails using natural language queries"
)

# Create the Gradio interface for email details
details_interface = gr.Interface(
    fn=get_email_details,
    inputs=[
        gr.Textbox(label="Email Address", placeholder="[email protected]"),
        gr.Textbox(label="App Password", type="password", placeholder="Your Gmail app password"),
        gr.Textbox(label="Message ID", placeholder="Email message ID from search results")
    ],
    outputs=gr.Textbox(label="Email Details", lines=20),
    title="Email Details",
    description="Get full details of a specific email by message ID"
)

# Create the Gradio interface for email pattern analysis
analysis_interface = gr.Interface(
    fn=analyze_email_patterns,
    inputs=[
        gr.Textbox(label="Email Address", placeholder="[email protected]"),
        gr.Textbox(label="App Password", type="password", placeholder="Your Gmail app password"),
        gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."),
        gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze")
    ],
    outputs=gr.Textbox(label="Analysis Results", lines=20),
    title="Email Pattern Analysis",
    description="Analyze email patterns from a specific sender over time"
)

# Combine interfaces into a tabbed interface
demo = gr.TabbedInterface(
    [search_interface, details_interface, analysis_interface],
    ["Email Search", "Email Details", "Pattern Analysis"],
    title="πŸ“§ Email Assistant MCP Server"
)

if __name__ == "__main__":
    # Set environment variable to enable MCP server
    import os
    os.environ["GRADIO_MCP_SERVER"] = "True"
    
    # Launch the server
    demo.launch(share=False)
    
    print("\nπŸš€ MCP Server is running!")
    print("πŸ“ MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse")
    print("πŸ“– Copy this URL to your Claude Desktop MCP configuration")