Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
FastAPI Routes for Email Query System | |
""" | |
from fastapi import APIRouter, HTTPException | |
from pydantic import BaseModel, EmailStr | |
from typing import List, Dict, Optional | |
import json | |
# Import our modules | |
from query_parser import parse_email_query, store_name_email_mapping | |
from email_scraper import scrape_emails_from_sender | |
router = APIRouter() | |
# Pydantic models | |
class NaturalQuery(BaseModel): | |
query: str | |
class EmailMappingInput(BaseModel): | |
name: str | |
email: EmailStr | |
class EmailResponse(BaseModel): | |
date: str | |
time: str | |
subject: str | |
content: str | |
message_id: str | |
class QueryParseResponse(BaseModel): | |
status: str | |
sender_intent: Optional[str] = None | |
resolved_email: Optional[str] = None | |
start_date: Optional[str] = None | |
end_date: Optional[str] = None | |
message: str | |
error: Optional[str] = None | |
class EmailsResponse(BaseModel): | |
status: str | |
sender_intent: str | |
resolved_email: str | |
start_date: str | |
end_date: str | |
total_emails: int | |
emails: List[EmailResponse] | |
message: str | |
def parse_email_query_endpoint(input_data: NaturalQuery): | |
""" | |
Parse natural language query to extract intent and dates | |
""" | |
try: | |
result = parse_email_query(input_data.query) | |
return QueryParseResponse(**result) | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"Query parsing failed: {str(e)}") | |
def add_email_mapping(mapping: EmailMappingInput): | |
""" | |
Add new name to email mapping | |
""" | |
try: | |
store_name_email_mapping(mapping.name, mapping.email) | |
return { | |
"status": "success", | |
"message": f"Mapping added: '{mapping.name}' → '{mapping.email}'" | |
} | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"Failed to add mapping: {str(e)}") | |
def get_emails_from_query(input_data: NaturalQuery): | |
""" | |
Complete flow: Parse query → Resolve email → Scrape emails | |
""" | |
try: | |
# Step 1: Parse the query | |
parsed_result = parse_email_query(input_data.query) | |
if parsed_result["status"] == "need_email_input": | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"type": "need_email_input", | |
"sender_intent": parsed_result["sender_intent"], | |
"message": parsed_result["message"] | |
} | |
) | |
elif parsed_result["status"] == "error": | |
raise HTTPException(status_code=400, detail=parsed_result["message"]) | |
# Step 2: Scrape emails | |
emails = scrape_emails_from_sender( | |
parsed_result["resolved_email"], | |
parsed_result["start_date"], | |
parsed_result["end_date"] | |
) | |
# Step 3: Format response | |
email_responses = [ | |
EmailResponse( | |
date=email["date"], | |
time=email["time"], | |
subject=email["subject"], | |
content=email["content"], | |
message_id=email["message_id"] | |
) | |
for email in emails | |
] | |
return EmailsResponse( | |
status="success", | |
sender_intent=parsed_result["sender_intent"], | |
resolved_email=parsed_result["resolved_email"], | |
start_date=parsed_result["start_date"], | |
end_date=parsed_result["end_date"], | |
total_emails=len(emails), | |
emails=email_responses, | |
message=f"Found {len(emails)} emails from {parsed_result['resolved_email']}" | |
) | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Email retrieval failed: {str(e)}") | |
def view_name_mappings(): | |
""" | |
View all stored name to email mappings | |
""" | |
try: | |
from query_parser import _load_name_mapping | |
mappings = _load_name_mapping() | |
return { | |
"status": "success", | |
"total_mappings": len(mappings), | |
"mappings": mappings | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to load mappings: {str(e)}") | |
def health_check(): | |
""" | |
Health check endpoint | |
""" | |
return { | |
"status": "healthy", | |
"message": "Email query system is running" | |
} | |
# For testing - manual endpoint to add mapping and then query | |
def complete_email_flow(input_data: dict): | |
""" | |
Test endpoint for complete flow with optional mapping | |
Expected input: | |
{ | |
"query": "emails from john last week", | |
"mapping": {"name": "john", "email": "[email protected]"} # optional | |
} | |
""" | |
try: | |
query = input_data.get("query") | |
mapping = input_data.get("mapping") | |
if not query: | |
raise HTTPException(status_code=400, detail="Query is required") | |
# Add mapping if provided | |
if mapping: | |
store_name_email_mapping(mapping["name"], mapping["email"]) | |
# Parse and get emails | |
parsed_result = parse_email_query(query) | |
if parsed_result["status"] == "need_email_input": | |
return { | |
"status": "need_mapping", | |
"message": parsed_result["message"], | |
"sender_intent": parsed_result["sender_intent"] | |
} | |
# Get emails | |
emails = scrape_emails_from_sender( | |
parsed_result["resolved_email"], | |
parsed_result["start_date"], | |
parsed_result["end_date"] | |
) | |
return { | |
"status": "success", | |
"query": query, | |
"parsed": parsed_result, | |
"total_emails": len(emails), | |
"emails": emails[:5] # Return first 5 emails | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) |