|
from schemas import ( |
|
FetchEmailsParams, |
|
ShowEmailParams, |
|
AnalyzeEmailsParams, |
|
DraftReplyParams, |
|
SendReplyParams, |
|
) |
|
from typing import Any, Dict |
|
from email_scraper import scrape_emails_by_text_search, _load_email_db, _save_email_db, _is_date_in_range |
|
from datetime import datetime, timedelta |
|
from typing import List |
|
from openai import OpenAI |
|
import json |
|
from dotenv import load_dotenv |
|
import os |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
client = OpenAI(api_key=OPENAI_API_KEY) |
|
|
|
|
|
def extract_query_info(query: str) -> Dict[str, str]: |
|
""" |
|
Use an LLM to extract sender information and date range from a user query. |
|
Returns {"sender_keyword": "company/sender name", "start_date":"DD-MMM-YYYY","end_date":"DD-MMM-YYYY"}. |
|
""" |
|
today_str = datetime.today().strftime("%d-%b-%Y") |
|
five_days_ago = (datetime.today() - timedelta(days=5)).strftime("%d-%b-%Y") |
|
|
|
system_prompt = f""" |
|
You are a query parser for email search. Today is {today_str}. |
|
|
|
Given a user query, extract the sender/company keyword and date range. Return _only_ valid JSON with: |
|
{{ |
|
"sender_keyword": "keyword or company name to search for", |
|
"start_date": "DD-MMM-YYYY", |
|
"end_date": "DD-MMM-YYYY" |
|
}} |
|
|
|
Rules: |
|
1. Extract sender keywords from phrases like "from swiggy", "swiggy emails", "mails from amazon", etc. |
|
2. If no time is mentioned, use last 5 days: {five_days_ago} to {today_str} |
|
3. Interpret relative dates as: |
|
- "today" → {today_str} to {today_str} |
|
- "yesterday" → 1 day ago to 1 day ago |
|
- "last week" → 7 days ago to {today_str} |
|
- "last month" → 30 days ago to {today_str} |
|
- "last N days" → N days ago to {today_str} |
|
|
|
Examples: |
|
- "show me mails for last week from swiggy" |
|
→ {{"sender_keyword": "swiggy", "start_date": "01-Jun-2025", "end_date": "{today_str}"}} |
|
- "emails from amazon yesterday" |
|
→ {{"sender_keyword": "amazon", "start_date": "06-Jun-2025", "end_date": "06-Jun-2025"}} |
|
- "show flipkart emails" |
|
→ {{"sender_keyword": "flipkart", "start_date": "{five_days_ago}", "end_date": "{today_str}"}} |
|
|
|
Return _only_ the JSON object—no extra text. |
|
""" |
|
|
|
messages = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": query} |
|
] |
|
resp = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
temperature=0.0, |
|
messages=messages |
|
) |
|
content = resp.choices[0].message.content.strip() |
|
|
|
|
|
try: |
|
return json.loads(content) |
|
except json.JSONDecodeError: |
|
start = content.find("{") |
|
end = content.rfind("}") + 1 |
|
return json.loads(content[start:end]) |
|
|
|
|
|
def fetch_emails(query: str) -> Dict: |
|
""" |
|
Fetch emails based on a natural language query that contains sender information and date range. |
|
Now uses text-based search and returns only summary information, not full content. |
|
|
|
Args: |
|
query: The natural language query (e.g., "show me mails for last week from swiggy") |
|
|
|
Returns: |
|
Dict with query_info, email_summary, analysis, and email_count |
|
""" |
|
|
|
query_info = extract_query_info(query) |
|
sender_keyword = query_info.get("sender_keyword", "") |
|
start_date = query_info.get("start_date") |
|
end_date = query_info.get("end_date") |
|
|
|
print(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}") |
|
|
|
|
|
full_emails = scrape_emails_by_text_search(sender_keyword, start_date, end_date) |
|
|
|
if not full_emails: |
|
return { |
|
"query_info": query_info, |
|
"email_summary": [], |
|
"analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []}, |
|
"email_count": 0 |
|
} |
|
|
|
|
|
email_summary = [] |
|
for email in full_emails: |
|
summary_email = { |
|
"date": email.get("date"), |
|
"time": email.get("time"), |
|
"subject": email.get("subject"), |
|
"from": email.get("from", "Unknown Sender"), |
|
"message_id": email.get("message_id") |
|
|
|
} |
|
email_summary.append(summary_email) |
|
|
|
|
|
analysis = analyze_emails(full_emails) |
|
|
|
|
|
return { |
|
"query_info": query_info, |
|
"email_summary": email_summary, |
|
"analysis": analysis, |
|
"email_count": len(full_emails) |
|
} |
|
|
|
|
|
def show_email(message_id: str) -> Dict: |
|
""" |
|
Retrieve the full email record (date, time, subject, content, etc.) |
|
from the local cache by message_id. |
|
""" |
|
db = _load_email_db() |
|
|
|
|
|
for sender_data in db.values(): |
|
for email in sender_data.get("emails", []): |
|
if email.get("message_id") == message_id: |
|
return email |
|
|
|
|
|
raise ValueError(f"No email found with message_id '{message_id}'") |
|
|
|
|
|
def draft_reply(email: Dict, tone: str) -> str: |
|
|
|
|
|
print(f"Drafting reply for email {email['id']} with tone: {tone}") |
|
return f"Drafted reply for email {email['id']} with tone {tone}." |
|
... |
|
|
|
|
|
def send_reply(message_id: str, reply_body: str) -> Dict: |
|
|
|
print(f"Sending reply to message {message_id} with body: {reply_body}") |
|
... |
|
|
|
|
|
def analyze_emails(emails: List[Dict]) -> Dict: |
|
""" |
|
Summarize and extract insights from a list of emails. |
|
Returns a dict with this schema: |
|
{ |
|
"summary": str, # a concise overview of all emails |
|
"insights": [str, ...] # list of key observations or stats |
|
} |
|
""" |
|
if not emails: |
|
return {"summary": "No emails to analyze.", "insights": []} |
|
|
|
|
|
simplified_emails = [] |
|
for email in emails: |
|
simplified_email = { |
|
"date": email.get("date"), |
|
"time": email.get("time"), |
|
"subject": email.get("subject"), |
|
"from": email.get("from", "Unknown Sender"), |
|
"content_preview": email.get("content", "")[:200] + "..." if email.get("content") else "" |
|
} |
|
simplified_emails.append(simplified_email) |
|
|
|
emails_payload = json.dumps(simplified_emails, ensure_ascii=False) |
|
|
|
|
|
system_prompt = """ |
|
You are an expert email analyst. You will be given a JSON array of email objects, |
|
each with keys: date, time, subject, from, content_preview. |
|
|
|
Your job is to produce _only_ valid JSON with two fields: |
|
1. summary: a 1–2 sentence high-level overview of these emails. |
|
2. insights: a list of 3–5 bullet-style observations or statistics |
|
(e.g. "5 emails from Swiggy", "mostly promotional content", "received over 3 days"). |
|
|
|
Focus on metadata like senders, subjects, dates, and patterns rather than detailed content analysis. |
|
|
|
Output exactly: |
|
|
|
{ |
|
"summary": "...", |
|
"insights": ["...", "...", ...] |
|
} |
|
""" |
|
messages = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": f"Here are the emails:\n{emails_payload}"} |
|
] |
|
|
|
|
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
temperature=0.0, |
|
messages=messages |
|
) |
|
|
|
|
|
content = response.choices[0].message.content.strip() |
|
try: |
|
return json.loads(content) |
|
except json.JSONDecodeError: |
|
|
|
start = content.find('{') |
|
end = content.rfind('}') + 1 |
|
return json.loads(content[start:end]) |
|
|
|
|
|
TOOL_MAPPING = { |
|
"fetch_emails": fetch_emails, |
|
"show_email": show_email, |
|
"analyze_emails": analyze_emails, |
|
"draft_reply": draft_reply, |
|
"send_reply": send_reply, |
|
} |