|
|
|
|
|
import os |
|
import json |
|
import re |
|
from typing import Any, Dict, Tuple, Optional |
|
from datetime import datetime |
|
|
|
from dotenv import load_dotenv |
|
from openai import OpenAI |
|
|
|
from schemas import Plan, PlanStep, FetchEmailsParams |
|
from tools import TOOL_MAPPING |
|
|
|
|
|
load_dotenv() |
|
api_key = os.getenv("OPENAI_API_KEY") |
|
if not api_key: |
|
raise RuntimeError("Missing OPENAI_API_KEY in environment") |
|
client = OpenAI(api_key=api_key) |
|
|
|
|
|
NAME_MAPPING_FILE = "name_mapping.json" |
|
|
|
|
|
SYSTEM_PLAN_PROMPT = """ |
|
You are an email assistant agent. You have access to the following actions: |
|
|
|
β’ fetch_emails - fetch emails based on sender and date criteria (includes date extraction) |
|
β’ show_email - display specific email content |
|
β’ analyze_emails - analyze email patterns or content |
|
β’ draft_reply - create a reply to an email |
|
β’ send_reply - send a drafted reply |
|
β’ done - complete the task |
|
|
|
When the user gives you a query, output _only_ valid JSON of this form: |
|
|
|
{ |
|
"plan": [ |
|
"fetch_emails", |
|
..., |
|
"done" |
|
] |
|
} |
|
|
|
Rules: |
|
- Use "fetch_emails" when you need to retrieve emails (it automatically handles date extraction) |
|
- The final entry _must_ be "done" |
|
- If no tool is needed, return `{"plan":["done"]}` |
|
|
|
Example: For "show me emails from dev today" β ["fetch_emails", "done"] |
|
""" |
|
|
|
SYSTEM_VALIDATOR_TEMPLATE = """ |
|
You are a plan validator. |
|
Context (results so far): |
|
{context} |
|
|
|
Next action: |
|
{action} |
|
|
|
Reply _only_ with JSON: |
|
{{ |
|
"should_execute": <true|false>, |
|
"parameters": <null or a JSON object with this action's parameters> |
|
}} |
|
""" |
|
|
|
|
|
def _load_name_mapping() -> Dict[str, str]: |
|
"""Load name to email mapping from JSON file""" |
|
if not os.path.exists(NAME_MAPPING_FILE): |
|
return {} |
|
try: |
|
with open(NAME_MAPPING_FILE, "r") as f: |
|
return json.load(f) |
|
except (json.JSONDecodeError, IOError): |
|
return {} |
|
|
|
|
|
def _save_name_mapping(mapping: Dict[str, str]): |
|
"""Save name to email mapping to JSON file""" |
|
with open(NAME_MAPPING_FILE, "w") as f: |
|
json.dump(mapping, f, indent=2) |
|
|
|
|
|
def store_name_email_mapping(name: str, email: str): |
|
"""Store new name to email mapping""" |
|
name_mapping = _load_name_mapping() |
|
name_mapping[name.lower().strip()] = email.lower().strip() |
|
_save_name_mapping(name_mapping) |
|
|
|
|
|
def extract_sender_info(query: str) -> Dict: |
|
""" |
|
Extract sender information from user query using LLM |
|
""" |
|
system_prompt = """ |
|
You are an email query parser that extracts sender information. |
|
|
|
Given a user query, extract the sender intent - the person/entity they want emails from. |
|
This could be: |
|
- A person's name (e.g., "dev", "john smith", "dev agarwal") |
|
- A company/service (e.g., "amazon", "google", "linkedin") |
|
- An email address (e.g., "[email protected]") |
|
|
|
Examples: |
|
- "emails from dev agarwal last week" β "dev agarwal" |
|
- "show amazon emails from last month" β "amazon" |
|
- "emails from [email protected] yesterday" β "[email protected]" |
|
- "get messages from sarah" β "sarah" |
|
|
|
Return ONLY valid JSON: |
|
{ |
|
"sender_intent": "extracted name, company, or email" |
|
} |
|
""" |
|
|
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
temperature=0.0, |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": query} |
|
], |
|
) |
|
|
|
result = json.loads(response.choices[0].message.content) |
|
return result |
|
|
|
|
|
def resolve_sender_email(sender_intent: str) -> Tuple[Optional[str], bool]: |
|
""" |
|
Resolve sender intent to actual email address |
|
Returns: (email_address, needs_user_input) |
|
""" |
|
|
|
if "@" in sender_intent: |
|
return sender_intent.lower(), False |
|
|
|
|
|
name_mapping = _load_name_mapping() |
|
|
|
|
|
normalized_intent = sender_intent.lower().strip() |
|
|
|
|
|
if normalized_intent in name_mapping: |
|
return name_mapping[normalized_intent], False |
|
|
|
|
|
for name, email in name_mapping.items(): |
|
if normalized_intent in name.lower() or name.lower() in normalized_intent: |
|
return email, False |
|
|
|
|
|
return None, True |
|
|
|
|
|
def get_plan_from_llm(user_query: str) -> Plan: |
|
""" |
|
Ask the LLM which actions to run, in order. No parameters here. |
|
""" |
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
temperature=0.0, |
|
messages=[ |
|
{"role": "system", "content": SYSTEM_PLAN_PROMPT}, |
|
{"role": "user", "content": user_query}, |
|
], |
|
) |
|
|
|
plan_json = json.loads(response.choices[0].message.content) |
|
steps = [PlanStep(action=a) for a in plan_json["plan"]] |
|
return Plan(plan=steps) |
|
|
|
|
|
def think( |
|
step: PlanStep, |
|
context: Dict[str, Any], |
|
user_query: str |
|
) -> Tuple[bool, Optional[PlanStep], Optional[str]]: |
|
""" |
|
Fill in parameters or skip based on the action: |
|
- fetch_emails: extract sender and pass the raw query for date extraction |
|
- others: ask the LLM validator for params |
|
|
|
Returns: (should_execute, updated_step, user_prompt_if_needed) |
|
""" |
|
|
|
if step.action == "fetch_emails": |
|
|
|
sender_info = extract_sender_info(user_query) |
|
sender_intent = sender_info.get("sender_intent", "") |
|
|
|
if not sender_intent: |
|
return False, None, None |
|
|
|
|
|
email_address, needs_input = resolve_sender_email(sender_intent) |
|
|
|
if needs_input: |
|
|
|
prompt_msg = f"I don't have an email address for '{sender_intent}'. Please provide the email address:" |
|
return False, None, prompt_msg |
|
|
|
params = FetchEmailsParams( |
|
email=email_address, |
|
query=user_query |
|
) |
|
return True, PlanStep(action="fetch_emails", parameters=params), None |
|
|
|
|
|
prompt = SYSTEM_VALIDATOR_TEMPLATE.format( |
|
context=json.dumps(context, indent=2), |
|
action=step.action, |
|
) |
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
temperature=0.0, |
|
messages=[ |
|
{"role": "system", "content": "Validate or supply parameters for this action."}, |
|
{"role": "user", "content": prompt}, |
|
], |
|
) |
|
verdict = json.loads(response.choices[0].message.content) |
|
if not verdict.get("should_execute", False): |
|
return False, None, None |
|
|
|
return True, PlanStep( |
|
action=step.action, |
|
parameters=verdict.get("parameters") |
|
), None |
|
|
|
|
|
def act(step: PlanStep) -> Any: |
|
""" |
|
Dispatch to the actual implementation in tools.py. |
|
""" |
|
fn = TOOL_MAPPING.get(step.action) |
|
if fn is None: |
|
raise ValueError(f"Unknown action '{step.action}'") |
|
|
|
kwargs = step.parameters.model_dump() if step.parameters else {} |
|
return fn(**kwargs) |