Spaces:
Sleeping
Sleeping
File size: 3,677 Bytes
7c68554 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File: db_schema.py
import sqlite3
DB_NAME = "swiggy_orders.db"
def init_db(db_path: str = DB_NAME) -> None:
"""
Initialize the SQLite database with the necessary tables.
"""
conn = sqlite3.connect(db_path)
c = conn.cursor()
# Orders metadata
c.execute(
"""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email_number INTEGER,
order_date TEXT,
order_time TEXT,
restaurant_name TEXT,
delivery_address TEXT,
total_price REAL
)
"""
)
# Individual items per order
c.execute(
"""
CREATE TABLE IF NOT EXISTS order_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id INTEGER,
item_name TEXT,
quantity INTEGER,
price REAL,
FOREIGN KEY(order_id) REFERENCES orders(id)
)
"""
)
conn.commit()
conn.close()
def get_db_connection(db_path: str = DB_NAME) -> sqlite3.Connection:
"""Return a new connection to the database."""
return sqlite3.connect(db_path)
def get_orders_by_date_from_db(date_str: str) -> list[dict]:
"""
Fetch all orders and their items for a given date from the database.
"""
conn = get_db_connection()
c = conn.cursor()
c.execute(
"SELECT id, email_number, order_time, restaurant_name, delivery_address, total_price"
" FROM orders WHERE order_date = ?",
(date_str,)
)
orders = []
for order_id, email_number, order_time, restaurant_name, delivery_address, total_price in c.fetchall():
# fetch items for this order
c.execute(
"SELECT item_name, quantity, price FROM order_items WHERE order_id = ?",
(order_id,)
)
items = [
{"name": name, "quantity": qty, "price": price}
for name, qty, price in c.fetchall()
]
orders.append({
"email_number": email_number,
"order_date": date_str,
"order_time": order_time,
"restaurant_name": restaurant_name,
"delivery_address": delivery_address,
"items": items,
"total_price": total_price
})
conn.close()
return orders
def save_orders_to_db(date_str: str, orders: list[dict]) -> None:
"""
Insert scraped orders and their items for a given date into the database.
"""
conn = get_db_connection()
c = conn.cursor()
for order in orders:
c.execute(
"""
INSERT INTO orders
(email_number, order_date, order_time, restaurant_name, delivery_address, total_price)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
order["email_number"],
date_str,
order["order_time"],
order["restaurant_name"],
order["delivery_address"],
order["total_price"]
)
)
order_id = c.lastrowid
for item in order.get("items", []):
c.execute(
"""
INSERT INTO order_items
(order_id, item_name, quantity, price)
VALUES (?, ?, ?, ?)
""",
(
order_id,
item["name"],
item["quantity"],
item["price"]
)
)
conn.commit()
conn.close()
|