ZweliM commited on
Commit
7a511b0
·
verified ·
1 Parent(s): e3eda0f

Create web_scraper.py

Browse files
Files changed (1) hide show
  1. web_scraper.py +214 -0
web_scraper.py ADDED
@@ -0,0 +1,214 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from playwright.sync_api import sync_playwright
2
+ import urllib.parse
3
+
4
+
5
+ def scrape_hificorp(page, product_name: str) -> dict | None:
6
+ """
7
+ Scrape HiFiCorp for the given product_name.
8
+ Returns a dict with keys: title, normal_price, promotion_price, source, product_link
9
+ or None if no product found.
10
+ """
11
+ search_url = (
12
+ "https://www.hificorp.co.za/catalogsearch/result/?q="
13
+ + urllib.parse.quote_plus(product_name)
14
+ )
15
+ page.goto(search_url, timeout=120_000)
16
+ page.wait_for_selector(".product-item-link", timeout=60_000)
17
+
18
+ product_url = page.locator(
19
+ ".product-item-link").first.get_attribute("href")
20
+ if not product_url:
21
+ return None
22
+
23
+ page.goto(product_url, timeout=120_000)
24
+ page.wait_for_selector("h1.page-title", timeout=60_000)
25
+
26
+ title = page.locator("h1.page-title").inner_text().strip()
27
+
28
+ # Promotion (final) price
29
+ try:
30
+ promotion_price = (
31
+ page.locator('[data-price-type="finalPrice"] .price')
32
+ .first.inner_text()
33
+ .strip()
34
+ )
35
+ except Exception:
36
+ promotion_price = None
37
+
38
+ # Old (normal) price, if present
39
+ try:
40
+ old_nodes = page.locator('[data-price-type="oldPrice"] .price')
41
+ normal_price = (
42
+ old_nodes.first.inner_text().strip() if old_nodes.count() else None
43
+ )
44
+ except Exception:
45
+ normal_price = None
46
+
47
+ # Fallback if no old price
48
+ normal_price = normal_price or promotion_price
49
+
50
+ return {
51
+ "title": title,
52
+ "normal_price": normal_price,
53
+ "promotion_price": promotion_price,
54
+ "source": "HiFiCorp",
55
+ "product_link": product_url,
56
+ }
57
+
58
+
59
+ def scrape_incredible(page, product_name: str) -> dict | None:
60
+ """
61
+ Scrape Incredible Connection for the given product_name.
62
+ Returns a dict with keys: title, normal_price, promotion_price, source, product_link
63
+ or None if no product found.
64
+ """
65
+ search_url = (
66
+ "https://www.incredible.co.za/catalogsearch/result/?q="
67
+ + urllib.parse.quote_plus(product_name)
68
+ )
69
+ page.goto(search_url, timeout=120_000)
70
+ page.wait_for_selector(".product-item-link", timeout=60_000)
71
+
72
+ product_url = page.locator(
73
+ ".product-item-link").first.get_attribute("href")
74
+ if not product_url:
75
+ return None
76
+
77
+ page.goto(product_url, timeout=120_000)
78
+ page.wait_for_selector("h1.page-title", timeout=60_000)
79
+
80
+ title = page.locator("h1.page-title").inner_text().strip()
81
+
82
+ try:
83
+ promotion_price = (
84
+ page.locator('[data-price-type="finalPrice"] .price')
85
+ .first.inner_text()
86
+ .strip()
87
+ )
88
+ except Exception:
89
+ promotion_price = None
90
+
91
+ try:
92
+ old_nodes = page.locator('[data-price-type="oldPrice"] .price')
93
+ normal_price = (
94
+ old_nodes.first.inner_text().strip() if old_nodes.count() else None
95
+ )
96
+ except Exception:
97
+ normal_price = None
98
+
99
+ normal_price = normal_price or promotion_price
100
+
101
+ return {
102
+ "title": title,
103
+ "normal_price": normal_price,
104
+ "promotion_price": promotion_price,
105
+ "source": "Incredible Connection",
106
+ "product_link": product_url,
107
+ }
108
+
109
+
110
+ def search_product(product_name: str) -> list[dict]:
111
+ """
112
+ Uses Playwright to scrape HiFiCorp and Incredible Connection for product_name.
113
+ Returns a list of dictionaries, each dict with keys:
114
+ title, normal_price, promotion_price, source, product_link.
115
+
116
+ If Playwright cannot run or no products found, returns an empty list.
117
+ """
118
+ results = []
119
+
120
+ try:
121
+ with sync_playwright() as p:
122
+ browser = p.chromium.launch(
123
+ headless=True,
124
+ args=["--no-sandbox", "--disable-setuid-sandbox",
125
+ "--disable-dev-shm-usage"],
126
+ )
127
+ page = browser.new_page()
128
+
129
+ # Scrape HiFiCorp
130
+ try:
131
+ hifi_data = scrape_hificorp(page, product_name)
132
+ if hifi_data:
133
+ results.append(hifi_data)
134
+ except Exception as e:
135
+ return (r"HiFiCorp scraping error:", type(e).__name__, e)
136
+ browser.close()
137
+
138
+ except NotImplementedError:
139
+ # Playwright cannot launch a browser in this environment
140
+ return ("Playwright NotImplementedError: scraping skipped.")
141
+
142
+ except Exception as e:
143
+ # Any other Playwright/browser launch error
144
+ print("Playwright launch error:", type(e).__name__, e)
145
+ return []
146
+
147
+ return results
148
+
149
+
150
+ def get_scraped_product_data(product_name: str):
151
+ """
152
+ Wrapper function to search for product data.
153
+ Returns a list of dictionaries with product details.
154
+ """
155
+ if not product_name:
156
+ return []
157
+
158
+ results = search_product(product_name)
159
+
160
+ # def save_df_to_csv(df: pd.DataFrame, filename="shop_out_results.csv"):
161
+ results.to_csv("scraped.csv", index=False)
162
+
163
+ if not results:
164
+ return []
165
+
166
+ return results
167
+
168
+
169
+ def search_your_product(query: str):
170
+ """Search for a product using the provided query string."""
171
+
172
+ json_out = search_product(query)
173
+ if not json_out:
174
+ return "No results found."
175
+ else:
176
+ product = []
177
+ for item in json_out:
178
+ product.append({
179
+ "title": item["title"],
180
+ "normal_price": item["normal_price"],
181
+ "promotion_price": item["promotion_price"],
182
+ "source": item["source"],
183
+ "product_link": item["product_link"]
184
+ })
185
+
186
+ return product
187
+
188
+
189
+ # For debugging or manual runs:
190
+ if __name__ == "__main__":
191
+ query = input("Enter product name: ")
192
+ json_out = search_product(query)
193
+ if not json_out:
194
+ print("No results found.")
195
+ else:
196
+ product = []
197
+ for item in json_out:
198
+ product.append({
199
+ "title": item["title"],
200
+ "normal_price": item["normal_price"],
201
+ "promotion_price": item["promotion_price"],
202
+ "source": item["source"],
203
+ "product_link": item["product_link"]
204
+ })
205
+
206
+ for items in product:
207
+ print(f"Title: {items['title']}")
208
+ print(f"Normal Price: {items['normal_price']}")
209
+ print(f"Promotion Price: {items['promotion_price']}")
210
+ print(f"Source: {items['source']}")
211
+ print(f"Product Link: {items['product_link']}")
212
+ print("-" * 40)
213
+ print(f"Found {len(product)} results for '{query}'.")
214
+ print("Search complete!")