Spaces:
Sleeping
Sleeping
""" | |
Reusable helper to fetch Swiggy order e-mails and return a list[dict]. | |
Usage: | |
from swiggy_scraper import fetch_swiggy_orders | |
orders = fetch_swiggy_orders("17-May-2025", "20-May-2025") | |
""" | |
import os, imaplib, json | |
from email import message_from_bytes | |
from bs4 import BeautifulSoup | |
from openai import OpenAI | |
from dotenv import load_dotenv | |
from datetime import datetime, timedelta | |
from email.utils import parsedate_to_datetime | |
from zoneinfo import ZoneInfo | |
from db_schema import init_db, get_orders_by_date_from_db, save_orders_to_db | |
load_dotenv() | |
APP_PASSWORD = os.getenv("APP_PASSWORD") | |
EMAIL_ID = os.getenv("EMAIL_ID") | |
OPENAI_KEY = os.getenv("OPENAI_API_KEY") | |
client = OpenAI(api_key=OPENAI_KEY) | |
def _imap_connect(): | |
m = imaplib.IMAP4_SSL("imap.gmail.com") | |
m.login(EMAIL_ID, APP_PASSWORD) | |
m.select('"[Gmail]/All Mail"') | |
return m | |
def _email_to_clean_text(msg): | |
html = next( | |
(part.get_payload(decode=True).decode(errors="ignore") | |
for part in msg.walk() | |
if part.get_content_type() == "text/html"), | |
None, | |
) | |
if not html: | |
return "" | |
soup = BeautifulSoup(html, "html.parser") | |
for t in soup(["script", "style", "head", "meta", "link"]): | |
t.decompose() | |
return "\n".join( | |
line.strip() for line in soup.get_text("\n").splitlines() if line.strip() | |
) | |
def _get_all_dates(start_date: str, end_date: str): | |
start = datetime.strptime(start_date, "%d-%b-%Y") | |
end = datetime.strptime(end_date, "%d-%b-%Y") | |
delta = (end - start).days + 1 | |
return [(start + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(delta)] | |
def _extract_with_llm(email_number, subject, body, email_date, email_time): | |
current_email = { | |
"subject": subject, | |
"body": body | |
} | |
prompt = f""" | |
You are given a Swiggy order confirmation email with a subject and body. | |
Extract and return only the following: | |
- "restaurant_name": name of the restaurant | |
- "delivery_address": the delivery address | |
- "items": a list of ordered items, each with "name", "quantity", and "price" (number) | |
- "total_price": the total bill paid including taxes, charges, etc. | |
Example output format: | |
{{ | |
"restaurant_name": "Dominos Pizza", | |
"delivery_address": "123 Main St, City", | |
"total_price": 567, | |
"items": [ | |
{{ "name": "Veg Pizza", "quantity": 2, "price": 199 }}, | |
{{ "name": "Coke", "quantity": 1, "price": 45 }} | |
] | |
}} | |
Return only valid JSON. No extra text or comments. | |
{json.dumps(current_email, indent=2)} | |
""" | |
try: | |
rsp = client.chat.completions.create( | |
model="gpt-4o-mini", | |
temperature=0, | |
messages=[ | |
{"role": "system", "content": "You are a precise JSON extractor."}, | |
{"role": "user", "content": prompt}, | |
], | |
) | |
# Attempt to parse the returned content | |
parsed_data = json.loads(rsp.choices[0].message.content) | |
# Wrap into final structure | |
final_output = { | |
"email_number": email_number, | |
"order_date": email_date, | |
"order_time": email_time, | |
"restaurant_name": parsed_data.get("restaurant_name", ""), | |
"delivery_address": parsed_data.get("delivery_address", ""), | |
"items": parsed_data.get("items", []), | |
"total_price": parsed_data.get("total_price", 0) | |
} | |
return final_output | |
except json.JSONDecodeError as json_err: | |
return { | |
"email_number": email_number, | |
"error": f"JSON decoding failed: {str(json_err)}", | |
"raw_response": rsp.choices[0].message.content if 'rsp' in locals() else None | |
} | |
except Exception as e: | |
return { | |
"email_number": email_number, | |
"error": f"Unexpected error: {str(e)}" | |
} | |
def fetch_swiggy_orders(start_date: str, end_date: str) -> list[dict]: | |
mail = _imap_connect() | |
all_dates = _get_all_dates(start_date, end_date) | |
orders = [] | |
for date_str in all_dates: | |
# 1) Try loading from DB | |
day_orders = get_orders_by_date_from_db(date_str) | |
if day_orders: | |
print(f"{date_str} loaded from DB") | |
orders.extend(day_orders) | |
continue | |
# 2) Otherwise scrape emails for that date | |
print(f"Fetching Swiggy emails for {date_str}") | |
dt_obj = datetime.strptime(date_str, "%Y-%m-%d") | |
next_day = (dt_obj + timedelta(days=1)).strftime("%d-%b-%Y") | |
this_day = dt_obj.strftime("%d-%b-%Y") | |
crit = f'(FROM "[email protected]") SINCE "{this_day}" BEFORE "{next_day}"' | |
_, data = mail.search(None, crit) | |
ids = data[0].split() | |
scraped_orders = [] | |
for idx, eid in enumerate(ids, 1): | |
_, msg_data = mail.fetch(eid, "(RFC822)") | |
msg = message_from_bytes(msg_data[0][1]) | |
subject = msg.get("Subject", "") | |
body_text = _email_to_clean_text(msg) | |
try: | |
dt_obj = parsedate_to_datetime(msg["Date"]).astimezone(ZoneInfo("Asia/Kolkata")) | |
email_date = dt_obj.strftime("%d-%b-%Y") | |
email_time = dt_obj.strftime("%H:%M:%S") | |
order = _extract_with_llm(idx, subject, body_text, email_date, email_time) | |
scraped_orders.append(order) | |
except Exception as exc: | |
scraped_orders.append({"email_number": idx, "error": str(exc)}) | |
# 3) Save newly scraped data to DB | |
save_orders_to_db(date_str, scraped_orders) | |
orders.extend(scraped_orders) | |
mail.logout() | |
return orders | |