Da-123's picture
scrape fix (#5)
f61da97 verified
raw
history blame
6.79 kB
# orchestrator.py
import os
import json
import re
from typing import Any, Dict, Tuple, Optional
from datetime import datetime
from dotenv import load_dotenv
from openai import OpenAI
from schemas import Plan, PlanStep, FetchEmailsParams
from tools import TOOL_MAPPING
# Load .env and initialize OpenAI client
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("Missing OPENAI_API_KEY in environment")
client = OpenAI(api_key=api_key)
# File paths for name mapping
NAME_MAPPING_FILE = "name_mapping.json"
# === Hard-coded list of available actions ===
SYSTEM_PLAN_PROMPT = """
You are an email assistant agent. You have access to the following actions:
β€’ fetch_emails - fetch emails using text search with sender keywords and date extraction (e.g., "swiggy emails last week")
β€’ show_email - display specific email content
β€’ analyze_emails - analyze email patterns or content
β€’ draft_reply - create a reply to an email
β€’ send_reply - send a drafted reply
β€’ done - complete the task
When the user gives you a query, output _only_ valid JSON of this form:
{
"plan": [
"fetch_emails",
...,
"done"
]
}
Rules:
- Use "fetch_emails" for text-based email search (automatically extracts sender keywords and dates)
- The final entry _must_ be "done"
- If no tool is needed, return `{"plan":["done"]}`
Example: For "show me emails from swiggy today" β†’ ["fetch_emails", "done"]
"""
SYSTEM_VALIDATOR_TEMPLATE = """
You are a plan validator.
Context (results so far):
{context}
Next action:
{action}
Reply _only_ with JSON:
{{
"should_execute": <true|false>,
"parameters": <null or a JSON object with this action's parameters>
}}
"""
def _load_name_mapping() -> Dict[str, str]:
"""Load name to email mapping from JSON file"""
if not os.path.exists(NAME_MAPPING_FILE):
return {}
try:
with open(NAME_MAPPING_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def _save_name_mapping(mapping: Dict[str, str]):
"""Save name to email mapping to JSON file"""
with open(NAME_MAPPING_FILE, "w") as f:
json.dump(mapping, f, indent=2)
def store_name_email_mapping(name: str, email: str):
"""Store new name to email mapping"""
name_mapping = _load_name_mapping()
name_mapping[name.lower().strip()] = email.lower().strip()
_save_name_mapping(name_mapping)
def extract_sender_info(query: str) -> Dict:
"""
Extract sender information from user query using LLM
"""
system_prompt = """
You are an email query parser that extracts sender information.
Given a user query, extract the sender intent - the person/entity they want emails from.
This could be:
- A person's name (e.g., "dev", "john smith", "dev agarwal")
- A company/service (e.g., "amazon", "google", "linkedin")
- An email address (e.g., "[email protected]")
Examples:
- "emails from dev agarwal last week" β†’ "dev agarwal"
- "show amazon emails from last month" β†’ "amazon"
- "emails from [email protected] yesterday" β†’ "[email protected]"
- "get messages from sarah" β†’ "sarah"
Return ONLY valid JSON:
{
"sender_intent": "extracted name, company, or email"
}
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0.0,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
],
)
result = json.loads(response.choices[0].message.content)
return result
def resolve_sender_email(sender_intent: str) -> Tuple[Optional[str], bool]:
"""
Resolve sender intent to actual email address
Returns: (email_address, needs_user_input)
"""
# Check if it's already an email address
if "@" in sender_intent:
return sender_intent.lower(), False
# Load name mapping
name_mapping = _load_name_mapping()
# Normalize the intent (lowercase for comparison)
normalized_intent = sender_intent.lower().strip()
# Check direct match
if normalized_intent in name_mapping:
return name_mapping[normalized_intent], False
# Check partial matches (fuzzy matching)
for name, email in name_mapping.items():
if normalized_intent in name.lower() or name.lower() in normalized_intent:
return email, False
# No match found
return None, True
def get_plan_from_llm(user_query: str) -> Plan:
"""
Ask the LLM which actions to run, in order. No parameters here.
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0.0,
messages=[
{"role": "system", "content": SYSTEM_PLAN_PROMPT},
{"role": "user", "content": user_query},
],
)
plan_json = json.loads(response.choices[0].message.content)
steps = [PlanStep(action=a) for a in plan_json["plan"]]
return Plan(plan=steps)
def think(
step: PlanStep,
context: Dict[str, Any],
user_query: str
) -> Tuple[bool, Optional[PlanStep], Optional[str]]:
"""
Fill in parameters or skip based on the action:
- fetch_emails: pass the raw query for text-based search and date extraction
- others: ask the LLM validator for params
Returns: (should_execute, updated_step, user_prompt_if_needed)
"""
# 1) fetch_emails β†’ pass the full query for text-based search and date extraction
if step.action == "fetch_emails":
params = FetchEmailsParams(
query=user_query # Pass the full query for keyword and date extraction
)
return True, PlanStep(action="fetch_emails", parameters=params), None
# 2) everything else β†’ validate & supply params via LLM
prompt = SYSTEM_VALIDATOR_TEMPLATE.format(
context=json.dumps(context, indent=2),
action=step.action,
)
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0.0,
messages=[
{"role": "system", "content": "Validate or supply parameters for this action."},
{"role": "user", "content": prompt},
],
)
verdict = json.loads(response.choices[0].message.content)
if not verdict.get("should_execute", False):
return False, None, None
return True, PlanStep(
action=step.action,
parameters=verdict.get("parameters")
), None
def act(step: PlanStep) -> Any:
"""
Dispatch to the actual implementation in tools.py.
"""
fn = TOOL_MAPPING.get(step.action)
if fn is None:
raise ValueError(f"Unknown action '{step.action}'")
kwargs = step.parameters.model_dump() if step.parameters else {}
return fn(**kwargs)