# orchestrator.py import os import json import re from typing import Any, Dict, Tuple, Optional from datetime import datetime from dotenv import load_dotenv from openai import OpenAI from schemas import Plan, PlanStep, FetchEmailsParams from tools import TOOL_MAPPING # Load .env and initialize OpenAI client load_dotenv() api_key = os.getenv("OPENAI_API_KEY") if not api_key: raise RuntimeError("Missing OPENAI_API_KEY in environment") client = OpenAI(api_key=api_key) # File paths for name mapping NAME_MAPPING_FILE = "name_mapping.json" # === Hard-coded list of available actions === SYSTEM_PLAN_PROMPT = """ You are an email assistant agent. You have access to the following actions: • fetch_emails - fetch emails using text search with sender keywords and date extraction (e.g., "swiggy emails last week") • show_email - display specific email content • analyze_emails - analyze email patterns or content • draft_reply - create a reply to an email • send_reply - send a drafted reply • done - complete the task When the user gives you a query, output _only_ valid JSON of this form: { "plan": [ "fetch_emails", ..., "done" ] } Rules: - Use "fetch_emails" for text-based email search (automatically extracts sender keywords and dates) - The final entry _must_ be "done" - If no tool is needed, return `{"plan":["done"]}` Example: For "show me emails from swiggy today" → ["fetch_emails", "done"] """ SYSTEM_VALIDATOR_TEMPLATE = """ You are a plan validator. Context (results so far): {context} Next action: {action} Reply _only_ with JSON: {{ "should_execute": , "parameters": }} """ def _load_name_mapping() -> Dict[str, str]: """Load name to email mapping from JSON file""" if not os.path.exists(NAME_MAPPING_FILE): return {} try: with open(NAME_MAPPING_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} def _save_name_mapping(mapping: Dict[str, str]): """Save name to email mapping to JSON file""" with open(NAME_MAPPING_FILE, "w") as f: json.dump(mapping, f, indent=2) def store_name_email_mapping(name: str, email: str): """Store new name to email mapping""" name_mapping = _load_name_mapping() name_mapping[name.lower().strip()] = email.lower().strip() _save_name_mapping(name_mapping) def extract_sender_info(query: str) -> Dict: """ Extract sender information from user query using LLM """ system_prompt = """ You are an email query parser that extracts sender information. Given a user query, extract the sender intent - the person/entity they want emails from. This could be: - A person's name (e.g., "dev", "john smith", "dev agarwal") - A company/service (e.g., "amazon", "google", "linkedin") - An email address (e.g., "john@company.com") Examples: - "emails from dev agarwal last week" → "dev agarwal" - "show amazon emails from last month" → "amazon" - "emails from john@company.com yesterday" → "john@company.com" - "get messages from sarah" → "sarah" Return ONLY valid JSON: { "sender_intent": "extracted name, company, or email" } """ response = client.chat.completions.create( model="gpt-4o-mini", temperature=0.0, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": query} ], ) result = json.loads(response.choices[0].message.content) return result def resolve_sender_email(sender_intent: str) -> Tuple[Optional[str], bool]: """ Resolve sender intent to actual email address Returns: (email_address, needs_user_input) """ # Check if it's already an email address if "@" in sender_intent: return sender_intent.lower(), False # Load name mapping name_mapping = _load_name_mapping() # Normalize the intent (lowercase for comparison) normalized_intent = sender_intent.lower().strip() # Check direct match if normalized_intent in name_mapping: return name_mapping[normalized_intent], False # Check partial matches (fuzzy matching) for name, email in name_mapping.items(): if normalized_intent in name.lower() or name.lower() in normalized_intent: return email, False # No match found return None, True def get_plan_from_llm(user_query: str) -> Plan: """ Ask the LLM which actions to run, in order. No parameters here. """ response = client.chat.completions.create( model="gpt-4o-mini", temperature=0.0, messages=[ {"role": "system", "content": SYSTEM_PLAN_PROMPT}, {"role": "user", "content": user_query}, ], ) plan_json = json.loads(response.choices[0].message.content) steps = [PlanStep(action=a) for a in plan_json["plan"]] return Plan(plan=steps) def think( step: PlanStep, context: Dict[str, Any], user_query: str ) -> Tuple[bool, Optional[PlanStep], Optional[str]]: """ Fill in parameters or skip based on the action: - fetch_emails: pass the raw query for text-based search and date extraction - others: ask the LLM validator for params Returns: (should_execute, updated_step, user_prompt_if_needed) """ # 1) fetch_emails → pass the full query for text-based search and date extraction if step.action == "fetch_emails": params = FetchEmailsParams( query=user_query # Pass the full query for keyword and date extraction ) return True, PlanStep(action="fetch_emails", parameters=params), None # 2) everything else → validate & supply params via LLM prompt = SYSTEM_VALIDATOR_TEMPLATE.format( context=json.dumps(context, indent=2), action=step.action, ) response = client.chat.completions.create( model="gpt-4o-mini", temperature=0.0, messages=[ {"role": "system", "content": "Validate or supply parameters for this action."}, {"role": "user", "content": prompt}, ], ) verdict = json.loads(response.choices[0].message.content) if not verdict.get("should_execute", False): return False, None, None return True, PlanStep( action=step.action, parameters=verdict.get("parameters") ), None def act(step: PlanStep) -> Any: """ Dispatch to the actual implementation in tools.py. """ fn = TOOL_MAPPING.get(step.action) if fn is None: raise ValueError(f"Unknown action '{step.action}'") kwargs = step.parameters.model_dump() if step.parameters else {} return fn(**kwargs)