|
import json |
|
from pydantic import BaseModel |
|
from typing import List, Set, Tuple |
|
from crawl4ai import ( |
|
AsyncWebCrawler, |
|
BrowserConfig, |
|
CacheMode, |
|
CrawlerRunConfig, |
|
LLMExtractionStrategy, |
|
) |
|
from utils import is_duplicated |
|
from config import LLM_MODEL, API_TOKEN |
|
|
|
|
|
def get_browser_config() -> BrowserConfig: |
|
""" |
|
Returns the browser configuration for the crawler. |
|
|
|
Returns: |
|
BrowserConfig: The configuration settings for the browser. |
|
""" |
|
|
|
return BrowserConfig( |
|
browser_type="chromium", |
|
headless=True, |
|
verbose=True, |
|
) |
|
|
|
|
|
def get_llm_strategy(llm_instructions: str, output_format: BaseModel) -> LLMExtractionStrategy: |
|
""" |
|
Returns the configuration for the language model extraction strategy. |
|
|
|
Returns: |
|
LLMExtractionStrategy: The settings for how to extract data using LLM. |
|
""" |
|
|
|
return LLMExtractionStrategy( |
|
provider=LLM_MODEL, |
|
api_token=API_TOKEN, |
|
schema=output_format.model_json_schema(), |
|
extraction_type="schema", |
|
instruction=llm_instructions, |
|
input_format="markdown", |
|
verbose=True, |
|
) |
|
|
|
async def check_no_results( |
|
crawler: AsyncWebCrawler, |
|
url: str, |
|
session_id: str, |
|
) -> bool: |
|
""" |
|
Checks if the "No Results Found" message is present on the page. |
|
|
|
Args: |
|
crawler (AsyncWebCrawler): The web crawler instance. |
|
url (str): The URL to check. |
|
session_id (str): The session identifier. |
|
|
|
Returns: |
|
bool: True if "No Results Found" message is found, False otherwise. |
|
""" |
|
|
|
result = await crawler.arun( |
|
url=url, |
|
config=CrawlerRunConfig( |
|
cache_mode=CacheMode.BYPASS, |
|
session_id=session_id, |
|
), |
|
) |
|
|
|
if result.success: |
|
if "No Results Found" in result.cleaned_html: |
|
return True |
|
else: |
|
print( |
|
f"Error fetching page for 'No Results Found' check: {result.error_message}" |
|
) |
|
|
|
return False |
|
|
|
|
|
async def fetch_and_process_page( |
|
crawler: AsyncWebCrawler, |
|
page_number: int, |
|
base_url: str, |
|
css_selector: str, |
|
llm_strategy: LLMExtractionStrategy, |
|
session_id: str, |
|
seen_names: Set[str], |
|
) -> Tuple[List[dict], bool]: |
|
""" |
|
Fetches and processes a single page from yellowpages. |
|
|
|
Args: |
|
crawler (AsyncWebCrawler): The web crawler instance. |
|
page_number (int): The page number to fetch. |
|
base_url (str): The base URL of the website. |
|
css_selector (str): The CSS selector to target the content. |
|
llm_strategy (LLMExtractionStrategy): The LLM extraction strategy. |
|
session_id (str): The session identifier. |
|
required_keys (List[str]): List of required keys in the business data. |
|
seen_names (Set[str]): Set of business names that have already been seen. |
|
|
|
Returns: |
|
Tuple[List[dict], bool]: |
|
- List[dict]: A list of processed businesss from the page. |
|
- bool: A flag indicating if the "No Results Found" message was encountered. |
|
""" |
|
url = base_url.format(page_number=page_number) |
|
print(f"Loading page {page_number}...") |
|
|
|
|
|
no_results = await check_no_results(crawler, url, session_id) |
|
if no_results: |
|
return [], True |
|
|
|
|
|
result = await crawler.arun( |
|
url=url, |
|
config=CrawlerRunConfig( |
|
cache_mode=CacheMode.BYPASS, |
|
extraction_strategy=llm_strategy, |
|
css_selector=css_selector, |
|
session_id=session_id, |
|
), |
|
) |
|
print("----------------------------- Result-----------------------------") |
|
print(result.extracted_content) |
|
|
|
if not (result.success and result.extracted_content): |
|
print(f"Error fetching page {page_number}: {result.error_message}") |
|
return [], False |
|
|
|
|
|
extracted_data = json.loads(result.extracted_content) |
|
print("----------------------------Exracted Data----------------------------") |
|
print(extracted_data) |
|
if not extracted_data: |
|
print(f"No businesss found on page {page_number}.") |
|
return [], False |
|
|
|
|
|
print("Extracted data:", extracted_data) |
|
|
|
|
|
all_businesses = [] |
|
for business in extracted_data: |
|
|
|
print("Processing business:", business) |
|
|
|
|
|
if business.get("error") is False: |
|
business.pop("error", None) |
|
|
|
if is_duplicated(business["name"], seen_names): |
|
print(f"Duplicate business '{business['name']}' found. Skipping.") |
|
continue |
|
|
|
|
|
seen_names.add(business["name"]) |
|
all_businesses.append(business) |
|
|
|
if not all_businesses: |
|
print(f"No complete businesss found on page {page_number}.") |
|
return [], False |
|
|
|
print(f"Extracted {len(all_businesses)} businesss from page {page_number}.") |
|
return all_businesses, False |
|
|