Spaces:
Sleeping
Sleeping
File size: 5,814 Bytes
7c68554 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
"""
Reusable helper to fetch Swiggy order e-mails and return a list[dict].
Usage:
from swiggy_scraper import fetch_swiggy_orders
orders = fetch_swiggy_orders("17-May-2025", "20-May-2025")
"""
import os, imaplib, json
from email import message_from_bytes
from bs4 import BeautifulSoup
from openai import OpenAI
from dotenv import load_dotenv
from datetime import datetime, timedelta
from email.utils import parsedate_to_datetime
from zoneinfo import ZoneInfo
from db_schema import init_db, get_orders_by_date_from_db, save_orders_to_db
load_dotenv()
APP_PASSWORD = os.getenv("APP_PASSWORD")
EMAIL_ID = os.getenv("EMAIL_ID")
OPENAI_KEY = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_KEY)
def _imap_connect():
m = imaplib.IMAP4_SSL("imap.gmail.com")
m.login(EMAIL_ID, APP_PASSWORD)
m.select('"[Gmail]/All Mail"')
return m
def _email_to_clean_text(msg):
html = next(
(part.get_payload(decode=True).decode(errors="ignore")
for part in msg.walk()
if part.get_content_type() == "text/html"),
None,
)
if not html:
return ""
soup = BeautifulSoup(html, "html.parser")
for t in soup(["script", "style", "head", "meta", "link"]):
t.decompose()
return "\n".join(
line.strip() for line in soup.get_text("\n").splitlines() if line.strip()
)
def _get_all_dates(start_date: str, end_date: str):
start = datetime.strptime(start_date, "%d-%b-%Y")
end = datetime.strptime(end_date, "%d-%b-%Y")
delta = (end - start).days + 1
return [(start + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(delta)]
def _extract_with_llm(email_number, subject, body, email_date, email_time):
current_email = {
"subject": subject,
"body": body
}
prompt = f"""
You are given a Swiggy order confirmation email with a subject and body.
Extract and return only the following:
- "restaurant_name": name of the restaurant
- "delivery_address": the delivery address
- "items": a list of ordered items, each with "name", "quantity", and "price" (number)
- "total_price": the total bill paid including taxes, charges, etc.
Example output format:
{{
"restaurant_name": "Dominos Pizza",
"delivery_address": "123 Main St, City",
"total_price": 567,
"items": [
{{ "name": "Veg Pizza", "quantity": 2, "price": 199 }},
{{ "name": "Coke", "quantity": 1, "price": 45 }}
]
}}
Return only valid JSON. No extra text or comments.
{json.dumps(current_email, indent=2)}
"""
try:
rsp = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
messages=[
{"role": "system", "content": "You are a precise JSON extractor."},
{"role": "user", "content": prompt},
],
)
# Attempt to parse the returned content
parsed_data = json.loads(rsp.choices[0].message.content)
# Wrap into final structure
final_output = {
"email_number": email_number,
"order_date": email_date,
"order_time": email_time,
"restaurant_name": parsed_data.get("restaurant_name", ""),
"delivery_address": parsed_data.get("delivery_address", ""),
"items": parsed_data.get("items", []),
"total_price": parsed_data.get("total_price", 0)
}
return final_output
except json.JSONDecodeError as json_err:
return {
"email_number": email_number,
"error": f"JSON decoding failed: {str(json_err)}",
"raw_response": rsp.choices[0].message.content if 'rsp' in locals() else None
}
except Exception as e:
return {
"email_number": email_number,
"error": f"Unexpected error: {str(e)}"
}
def fetch_swiggy_orders(start_date: str, end_date: str) -> list[dict]:
mail = _imap_connect()
all_dates = _get_all_dates(start_date, end_date)
orders = []
for date_str in all_dates:
# 1) Try loading from DB
day_orders = get_orders_by_date_from_db(date_str)
if day_orders:
print(f"{date_str} loaded from DB")
orders.extend(day_orders)
continue
# 2) Otherwise scrape emails for that date
print(f"Fetching Swiggy emails for {date_str}")
dt_obj = datetime.strptime(date_str, "%Y-%m-%d")
next_day = (dt_obj + timedelta(days=1)).strftime("%d-%b-%Y")
this_day = dt_obj.strftime("%d-%b-%Y")
crit = f'(FROM "[email protected]") SINCE "{this_day}" BEFORE "{next_day}"'
_, data = mail.search(None, crit)
ids = data[0].split()
scraped_orders = []
for idx, eid in enumerate(ids, 1):
_, msg_data = mail.fetch(eid, "(RFC822)")
msg = message_from_bytes(msg_data[0][1])
subject = msg.get("Subject", "")
body_text = _email_to_clean_text(msg)
try:
dt_obj = parsedate_to_datetime(msg["Date"]).astimezone(ZoneInfo("Asia/Kolkata"))
email_date = dt_obj.strftime("%d-%b-%Y")
email_time = dt_obj.strftime("%H:%M:%S")
order = _extract_with_llm(idx, subject, body_text, email_date, email_time)
scraped_orders.append(order)
except Exception as exc:
scraped_orders.append({"email_number": idx, "error": str(exc)})
# 3) Save newly scraped data to DB
save_orders_to_db(date_str, scraped_orders)
orders.extend(scraped_orders)
mail.logout()
return orders
|