sales_assistant / web_scraper.py
ZweliM's picture
Create web_scraper.py
7a511b0 verified
from playwright.sync_api import sync_playwright
import urllib.parse
def scrape_hificorp(page, product_name: str) -> dict | None:
"""
Scrape HiFiCorp for the given product_name.
Returns a dict with keys: title, normal_price, promotion_price, source, product_link
or None if no product found.
"""
search_url = (
"https://www.hificorp.co.za/catalogsearch/result/?q="
+ urllib.parse.quote_plus(product_name)
)
page.goto(search_url, timeout=120_000)
page.wait_for_selector(".product-item-link", timeout=60_000)
product_url = page.locator(
".product-item-link").first.get_attribute("href")
if not product_url:
return None
page.goto(product_url, timeout=120_000)
page.wait_for_selector("h1.page-title", timeout=60_000)
title = page.locator("h1.page-title").inner_text().strip()
# Promotion (final) price
try:
promotion_price = (
page.locator('[data-price-type="finalPrice"] .price')
.first.inner_text()
.strip()
)
except Exception:
promotion_price = None
# Old (normal) price, if present
try:
old_nodes = page.locator('[data-price-type="oldPrice"] .price')
normal_price = (
old_nodes.first.inner_text().strip() if old_nodes.count() else None
)
except Exception:
normal_price = None
# Fallback if no old price
normal_price = normal_price or promotion_price
return {
"title": title,
"normal_price": normal_price,
"promotion_price": promotion_price,
"source": "HiFiCorp",
"product_link": product_url,
}
def scrape_incredible(page, product_name: str) -> dict | None:
"""
Scrape Incredible Connection for the given product_name.
Returns a dict with keys: title, normal_price, promotion_price, source, product_link
or None if no product found.
"""
search_url = (
"https://www.incredible.co.za/catalogsearch/result/?q="
+ urllib.parse.quote_plus(product_name)
)
page.goto(search_url, timeout=120_000)
page.wait_for_selector(".product-item-link", timeout=60_000)
product_url = page.locator(
".product-item-link").first.get_attribute("href")
if not product_url:
return None
page.goto(product_url, timeout=120_000)
page.wait_for_selector("h1.page-title", timeout=60_000)
title = page.locator("h1.page-title").inner_text().strip()
try:
promotion_price = (
page.locator('[data-price-type="finalPrice"] .price')
.first.inner_text()
.strip()
)
except Exception:
promotion_price = None
try:
old_nodes = page.locator('[data-price-type="oldPrice"] .price')
normal_price = (
old_nodes.first.inner_text().strip() if old_nodes.count() else None
)
except Exception:
normal_price = None
normal_price = normal_price or promotion_price
return {
"title": title,
"normal_price": normal_price,
"promotion_price": promotion_price,
"source": "Incredible Connection",
"product_link": product_url,
}
def search_product(product_name: str) -> list[dict]:
"""
Uses Playwright to scrape HiFiCorp and Incredible Connection for product_name.
Returns a list of dictionaries, each dict with keys:
title, normal_price, promotion_price, source, product_link.
If Playwright cannot run or no products found, returns an empty list.
"""
results = []
try:
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-setuid-sandbox",
"--disable-dev-shm-usage"],
)
page = browser.new_page()
# Scrape HiFiCorp
try:
hifi_data = scrape_hificorp(page, product_name)
if hifi_data:
results.append(hifi_data)
except Exception as e:
return (r"HiFiCorp scraping error:", type(e).__name__, e)
browser.close()
except NotImplementedError:
# Playwright cannot launch a browser in this environment
return ("Playwright NotImplementedError: scraping skipped.")
except Exception as e:
# Any other Playwright/browser launch error
print("Playwright launch error:", type(e).__name__, e)
return []
return results
def get_scraped_product_data(product_name: str):
"""
Wrapper function to search for product data.
Returns a list of dictionaries with product details.
"""
if not product_name:
return []
results = search_product(product_name)
# def save_df_to_csv(df: pd.DataFrame, filename="shop_out_results.csv"):
results.to_csv("scraped.csv", index=False)
if not results:
return []
return results
def search_your_product(query: str):
"""Search for a product using the provided query string."""
json_out = search_product(query)
if not json_out:
return "No results found."
else:
product = []
for item in json_out:
product.append({
"title": item["title"],
"normal_price": item["normal_price"],
"promotion_price": item["promotion_price"],
"source": item["source"],
"product_link": item["product_link"]
})
return product
# For debugging or manual runs:
if __name__ == "__main__":
query = input("Enter product name: ")
json_out = search_product(query)
if not json_out:
print("No results found.")
else:
product = []
for item in json_out:
product.append({
"title": item["title"],
"normal_price": item["normal_price"],
"promotion_price": item["promotion_price"],
"source": item["source"],
"product_link": item["product_link"]
})
for items in product:
print(f"Title: {items['title']}")
print(f"Normal Price: {items['normal_price']}")
print(f"Promotion Price: {items['promotion_price']}")
print(f"Source: {items['source']}")
print(f"Product Link: {items['product_link']}")
print("-" * 40)
print(f"Found {len(product)} results for '{query}'.")
print("Search complete!")