MailQuery / mcp /server /swiggy_scraper.py
Da-123's picture
Upload 7 files
7c68554 verified
raw
history blame
5.81 kB
"""
Reusable helper to fetch Swiggy order e-mails and return a list[dict].
Usage:
from swiggy_scraper import fetch_swiggy_orders
orders = fetch_swiggy_orders("17-May-2025", "20-May-2025")
"""
import os, imaplib, json
from email import message_from_bytes
from bs4 import BeautifulSoup
from openai import OpenAI
from dotenv import load_dotenv
from datetime import datetime, timedelta
from email.utils import parsedate_to_datetime
from zoneinfo import ZoneInfo
from db_schema import init_db, get_orders_by_date_from_db, save_orders_to_db
load_dotenv()
APP_PASSWORD = os.getenv("APP_PASSWORD")
EMAIL_ID = os.getenv("EMAIL_ID")
OPENAI_KEY = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_KEY)
def _imap_connect():
m = imaplib.IMAP4_SSL("imap.gmail.com")
m.login(EMAIL_ID, APP_PASSWORD)
m.select('"[Gmail]/All Mail"')
return m
def _email_to_clean_text(msg):
html = next(
(part.get_payload(decode=True).decode(errors="ignore")
for part in msg.walk()
if part.get_content_type() == "text/html"),
None,
)
if not html:
return ""
soup = BeautifulSoup(html, "html.parser")
for t in soup(["script", "style", "head", "meta", "link"]):
t.decompose()
return "\n".join(
line.strip() for line in soup.get_text("\n").splitlines() if line.strip()
)
def _get_all_dates(start_date: str, end_date: str):
start = datetime.strptime(start_date, "%d-%b-%Y")
end = datetime.strptime(end_date, "%d-%b-%Y")
delta = (end - start).days + 1
return [(start + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(delta)]
def _extract_with_llm(email_number, subject, body, email_date, email_time):
current_email = {
"subject": subject,
"body": body
}
prompt = f"""
You are given a Swiggy order confirmation email with a subject and body.
Extract and return only the following:
- "restaurant_name": name of the restaurant
- "delivery_address": the delivery address
- "items": a list of ordered items, each with "name", "quantity", and "price" (number)
- "total_price": the total bill paid including taxes, charges, etc.
Example output format:
{{
"restaurant_name": "Dominos Pizza",
"delivery_address": "123 Main St, City",
"total_price": 567,
"items": [
{{ "name": "Veg Pizza", "quantity": 2, "price": 199 }},
{{ "name": "Coke", "quantity": 1, "price": 45 }}
]
}}
Return only valid JSON. No extra text or comments.
{json.dumps(current_email, indent=2)}
"""
try:
rsp = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
messages=[
{"role": "system", "content": "You are a precise JSON extractor."},
{"role": "user", "content": prompt},
],
)
# Attempt to parse the returned content
parsed_data = json.loads(rsp.choices[0].message.content)
# Wrap into final structure
final_output = {
"email_number": email_number,
"order_date": email_date,
"order_time": email_time,
"restaurant_name": parsed_data.get("restaurant_name", ""),
"delivery_address": parsed_data.get("delivery_address", ""),
"items": parsed_data.get("items", []),
"total_price": parsed_data.get("total_price", 0)
}
return final_output
except json.JSONDecodeError as json_err:
return {
"email_number": email_number,
"error": f"JSON decoding failed: {str(json_err)}",
"raw_response": rsp.choices[0].message.content if 'rsp' in locals() else None
}
except Exception as e:
return {
"email_number": email_number,
"error": f"Unexpected error: {str(e)}"
}
def fetch_swiggy_orders(start_date: str, end_date: str) -> list[dict]:
mail = _imap_connect()
all_dates = _get_all_dates(start_date, end_date)
orders = []
for date_str in all_dates:
# 1) Try loading from DB
day_orders = get_orders_by_date_from_db(date_str)
if day_orders:
print(f"{date_str} loaded from DB")
orders.extend(day_orders)
continue
# 2) Otherwise scrape emails for that date
print(f"Fetching Swiggy emails for {date_str}")
dt_obj = datetime.strptime(date_str, "%Y-%m-%d")
next_day = (dt_obj + timedelta(days=1)).strftime("%d-%b-%Y")
this_day = dt_obj.strftime("%d-%b-%Y")
crit = f'(FROM "[email protected]") SINCE "{this_day}" BEFORE "{next_day}"'
_, data = mail.search(None, crit)
ids = data[0].split()
scraped_orders = []
for idx, eid in enumerate(ids, 1):
_, msg_data = mail.fetch(eid, "(RFC822)")
msg = message_from_bytes(msg_data[0][1])
subject = msg.get("Subject", "")
body_text = _email_to_clean_text(msg)
try:
dt_obj = parsedate_to_datetime(msg["Date"]).astimezone(ZoneInfo("Asia/Kolkata"))
email_date = dt_obj.strftime("%d-%b-%Y")
email_time = dt_obj.strftime("%H:%M:%S")
order = _extract_with_llm(idx, subject, body_text, email_date, email_time)
scraped_orders.append(order)
except Exception as exc:
scraped_orders.append({"email_number": idx, "error": str(exc)})
# 3) Save newly scraped data to DB
save_orders_to_db(date_str, scraped_orders)
orders.extend(scraped_orders)
mail.logout()
return orders