Spaces:
Sleeping
Sleeping
# orchestrator.py | |
import os | |
import json | |
import re | |
from typing import Any, Dict, Tuple, Optional | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from openai import OpenAI | |
from schemas import Plan, PlanStep, FetchEmailsParams | |
from tools import TOOL_MAPPING | |
# Load .env and initialize OpenAI client | |
load_dotenv() | |
api_key = os.getenv("OPENAI_API_KEY") | |
if not api_key: | |
raise RuntimeError("Missing OPENAI_API_KEY in environment") | |
client = OpenAI(api_key=api_key) | |
# File paths for name mapping | |
NAME_MAPPING_FILE = "name_mapping.json" | |
# === Hard-coded list of available actions === | |
SYSTEM_PLAN_PROMPT = """ | |
You are an email assistant agent. You have access to the following actions: | |
β’ fetch_emails - fetch emails using text search with sender keywords and date extraction (e.g., "swiggy emails last week") | |
β’ show_email - display specific email content | |
β’ analyze_emails - analyze email patterns or content | |
β’ draft_reply - create a reply to an email | |
β’ send_reply - send a drafted reply | |
β’ done - complete the task | |
When the user gives you a query, output _only_ valid JSON of this form: | |
{ | |
"plan": [ | |
"fetch_emails", | |
..., | |
"done" | |
] | |
} | |
Rules: | |
- Use "fetch_emails" for text-based email search (automatically extracts sender keywords and dates) | |
- The final entry _must_ be "done" | |
- If no tool is needed, return `{"plan":["done"]}` | |
Example: For "show me emails from swiggy today" β ["fetch_emails", "done"] | |
""" | |
SYSTEM_VALIDATOR_TEMPLATE = """ | |
You are a plan validator. | |
Context (results so far): | |
{context} | |
Next action: | |
{action} | |
Reply _only_ with JSON: | |
{{ | |
"should_execute": <true|false>, | |
"parameters": <null or a JSON object with this action's parameters> | |
}} | |
""" | |
def _load_name_mapping() -> Dict[str, str]: | |
"""Load name to email mapping from JSON file""" | |
if not os.path.exists(NAME_MAPPING_FILE): | |
return {} | |
try: | |
with open(NAME_MAPPING_FILE, "r") as f: | |
return json.load(f) | |
except (json.JSONDecodeError, IOError): | |
return {} | |
def _save_name_mapping(mapping: Dict[str, str]): | |
"""Save name to email mapping to JSON file""" | |
with open(NAME_MAPPING_FILE, "w") as f: | |
json.dump(mapping, f, indent=2) | |
def store_name_email_mapping(name: str, email: str): | |
"""Store new name to email mapping""" | |
name_mapping = _load_name_mapping() | |
name_mapping[name.lower().strip()] = email.lower().strip() | |
_save_name_mapping(name_mapping) | |
def extract_sender_info(query: str) -> Dict: | |
""" | |
Extract sender information from user query using LLM | |
""" | |
system_prompt = """ | |
You are an email query parser that extracts sender information. | |
Given a user query, extract the sender intent - the person/entity they want emails from. | |
This could be: | |
- A person's name (e.g., "dev", "john smith", "dev agarwal") | |
- A company/service (e.g., "amazon", "google", "linkedin") | |
- An email address (e.g., "[email protected]") | |
Examples: | |
- "emails from dev agarwal last week" β "dev agarwal" | |
- "show amazon emails from last month" β "amazon" | |
- "emails from [email protected] yesterday" β "[email protected]" | |
- "get messages from sarah" β "sarah" | |
Return ONLY valid JSON: | |
{ | |
"sender_intent": "extracted name, company, or email" | |
} | |
""" | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
temperature=0.0, | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": query} | |
], | |
) | |
result = json.loads(response.choices[0].message.content) | |
return result | |
def resolve_sender_email(sender_intent: str) -> Tuple[Optional[str], bool]: | |
""" | |
Resolve sender intent to actual email address | |
Returns: (email_address, needs_user_input) | |
""" | |
# Check if it's already an email address | |
if "@" in sender_intent: | |
return sender_intent.lower(), False | |
# Load name mapping | |
name_mapping = _load_name_mapping() | |
# Normalize the intent (lowercase for comparison) | |
normalized_intent = sender_intent.lower().strip() | |
# Check direct match | |
if normalized_intent in name_mapping: | |
return name_mapping[normalized_intent], False | |
# Check partial matches (fuzzy matching) | |
for name, email in name_mapping.items(): | |
if normalized_intent in name.lower() or name.lower() in normalized_intent: | |
return email, False | |
# No match found | |
return None, True | |
def get_plan_from_llm(user_query: str) -> Plan: | |
""" | |
Ask the LLM which actions to run, in order. No parameters here. | |
""" | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
temperature=0.0, | |
messages=[ | |
{"role": "system", "content": SYSTEM_PLAN_PROMPT}, | |
{"role": "user", "content": user_query}, | |
], | |
) | |
plan_json = json.loads(response.choices[0].message.content) | |
steps = [PlanStep(action=a) for a in plan_json["plan"]] | |
return Plan(plan=steps) | |
def think( | |
step: PlanStep, | |
context: Dict[str, Any], | |
user_query: str | |
) -> Tuple[bool, Optional[PlanStep], Optional[str]]: | |
""" | |
Fill in parameters or skip based on the action: | |
- fetch_emails: pass the raw query for text-based search and date extraction | |
- others: ask the LLM validator for params | |
Returns: (should_execute, updated_step, user_prompt_if_needed) | |
""" | |
# 1) fetch_emails β pass the full query for text-based search and date extraction | |
if step.action == "fetch_emails": | |
params = FetchEmailsParams( | |
query=user_query # Pass the full query for keyword and date extraction | |
) | |
return True, PlanStep(action="fetch_emails", parameters=params), None | |
# 2) everything else β validate & supply params via LLM | |
prompt = SYSTEM_VALIDATOR_TEMPLATE.format( | |
context=json.dumps(context, indent=2), | |
action=step.action, | |
) | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
temperature=0.0, | |
messages=[ | |
{"role": "system", "content": "Validate or supply parameters for this action."}, | |
{"role": "user", "content": prompt}, | |
], | |
) | |
verdict = json.loads(response.choices[0].message.content) | |
if not verdict.get("should_execute", False): | |
return False, None, None | |
return True, PlanStep( | |
action=step.action, | |
parameters=verdict.get("parameters") | |
), None | |
def act(step: PlanStep) -> Any: | |
""" | |
Dispatch to the actual implementation in tools.py. | |
""" | |
fn = TOOL_MAPPING.get(step.action) | |
if fn is None: | |
raise ValueError(f"Unknown action '{step.action}'") | |
kwargs = step.parameters.model_dump() if step.parameters else {} | |
return fn(**kwargs) |