diff --git "a/maps2.py" "b/maps2.py" --- "a/maps2.py" +++ "b/maps2.py" @@ -1,681 +1,1218 @@ -# --- Google Colab で実行する場合、最初に以下のセルを実行してください --- -# !pip install selenium google_colab_selenium webdriver-manager beautifulsoup4 pandas gradio - import gradio as gr -import pandas as pd -from selenium import webdriver -from selenium.webdriver.chrome.options import Options -from selenium.common.exceptions import WebDriverException, TimeoutException -try: - import google_colab_selenium as gs # Colab用 - IN_COLAB = True -except : - IN_COLAB = False +import os +import csv +import json import time from bs4 import BeautifulSoup -from urllib.parse import urljoin # 相対URLを絶対URLに変換するために使用 -import traceback # エラー詳細表示用 -import os # ファイルパス操作用 -import shutil # ファイルコピー用 (Drive同期) - -# --- Google Drive マウント (Colab環境のみ) --- -DRIVE_MOUNT_POINT = '/content/drive/MyDrive/' -drive = None # Drive オブジェクトを保持 (Colab用) - -# --- 停止フラグ --- -stop_requested = False - -def setup_drive_mount(log_container): - """Colab環境の場合、Google Driveをマウントする""" - global drive - if IN_COLAB: - from google.colab import drive as colab_drive - try: - if not os.path.exists('/content/drive'): - log_container.append("Google Driveをマウントします。認証が必要です...") - print("Google Driveをマウントします。認証が必要です...") - drive = colab_drive - drive.mount('/content/drive') - log_container.append("Google Driveのマウント完了。") - print("Google Driveのマウント完了。") - else: - log_container.append("Google Driveは既にマウントされています。") - print("Google Driveは既にマウントされています。") - # 既存のマウントでも drive オブジェクトをセット - drive = colab_drive - return True, "" - except Exception as e: - error_msg = f"Google Driveのマウント中にエラーが発生しました: {e}\n{traceback.format_exc()}" - log_container.append(error_msg) - print(error_msg) - drive = None # エラー時はNoneに戻す - return False, error_msg - else: - log_container.append("Colab環境ではないため、Driveマウントはスキップします。") - print("Colab環境ではないため、Driveマウントはスキップします。") - return False, "Not in Colab environment." - - -# --- 定数 --- -LOCAL_CSV_FILENAME = "pitact_企業リスト_ローカル一時保存.csv" -CSV_HEADERS = ["会社名", "住所", "ホームページURL", "設立日", "Pitact 詳細URL"] - -# --- Selenium WebDriver 設定 --- -def setup_driver(log_container): - """Selenium WebDriverのインスタンスを設定して返す""" - log_container.append("WebDriverを設定中...") - print("WebDriverを設定中...") - chrome_options = Options() - chrome_options.add_argument('--headless') - chrome_options.add_argument('--no-sandbox') - chrome_options.add_argument('--disable-dev-shm-usage') - chrome_options.add_argument('--log-level=3') - chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) - try: - if IN_COLAB and 'google_colab_selenium' in globals(): - log_container.append("Colab環境としてWebDriverを起動します。") - print("Colab環境としてWebDriverを起動します。") - driver = gs.Chrome(options=chrome_options) - else: - log_container.append("ローカル環境でのWebDriver起動を試みます。") - print("ローカル環境でのWebDriver起動を試みます。") - try: - from webdriver_manager.chrome import ChromeDriverManager - from selenium.webdriver.chrome.service import Service - service = Service(ChromeDriverManager().install()) - driver = webdriver.Chrome(service=service, options=chrome_options) - log_container.append("webdriver-manager を使用してWebDriverを起動しました。") - print("webdriver-manager を使用してWebDriverを起動しました。") - except ImportError: - log_container.append("webdriver-manager が見つかりません。パスが通っているChromeDriverを使用します。") - print("webdriver-manager が見つかりません。パスが通っているChromeDriverを使用します。") - driver = webdriver.Chrome(options=chrome_options) - - driver.set_page_load_timeout(45) - log_container.append("WebDriverの設定完了。") - print("WebDriverの設定完了。") - return driver - except Exception as e: - error_msg = f"WebDriverの設定中にエラーが発生しました: {e}\n{traceback.format_exc()}" - log_container.append(error_msg) - print(error_msg) - return None - -def get_html_with_driver(driver, url, log_container, wait_time=5): - """既存のSeleniumドライバを使用して指定されたURLからHTMLを取得する。""" - if not driver: - error_msg = "WebDriverが利用できません。\n" - log_container.append(error_msg) - return None, error_msg - log_message = "" +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, ElementClickInterceptedException +import re +from urllib.parse import urlparse, urljoin +import traceback +import io +import contextlib +from datetime import datetime +import threading # スレッド中断のために追加 + +# --- WebDriverの選択 --- +IN_COLAB = 'google.colab' in str(get_ipython()) if 'get_ipython' in globals() else False +if IN_COLAB: + print("Google Colab環境を検出。google_colab_selenium を使用します。") + try: import google_colab_selenium as gs + except ImportError: print("google_colab_seleniumが見つかりません。!pip install google-colab-selenium を実行してください。"); gs = None +else: + print("ローカル環境を検出。通常の selenium webdriver を使用します。") + from selenium import webdriver + gs = None try: - log_msg = f"アクセス中: {url}" - log_container.append(log_msg) - print(log_msg) - driver.get(url) - - log_msg = f"ページ読み込み完了。{wait_time}秒待機..." - log_container.append(log_msg) - print(log_msg) - # 停止リクエストがあれば待機を短縮またはスキップすることも可能だが、 - # ここではシンプルに待機は完了させる - time.sleep(wait_time) - - html = driver.page_source - log_msg = f"{url} からHTMLを正常に取得しました。" - log_container.append(log_msg) - print(log_msg) - return html, log_message - except TimeoutException: - error_msg = f"ページの読み込みがタイムアウトしました: {url}" - log_container.append(error_msg) - print(error_msg) - return None, error_msg - except WebDriverException as e: - # 停止リクエストによる中断の場合、WebDriverExceptionが発生することがある - if stop_requested: - log_container.append("処理停止リクエストによりWebDriver操作が中断された可能性があります。") - return None, "WebDriver操作中断" - error_msg = f"WebDriverエラーが発生しました ({url}): {e}" - log_container.append(error_msg) - print(error_msg) - return None, error_msg - except Exception as e: - error_msg = f"予期せぬエラーが発生しました ({url}): {e}\n{traceback.format_exc()}" - log_container.append(error_msg) - print(error_msg) - return None, error_msg - -# --- BeautifulSoup HTML解析 (変更なし) --- -def parse_listing_page(html_content, log_container): - """企業一覧ページのHTMLを解析し、会社名と詳細ページURLのリストを取得する。""" - if not html_content: - error_msg = "一覧ページのHTMLコンテンツが空です。" - log_container.append(error_msg) - return [], error_msg - log_message = "" - soup = BeautifulSoup(html_content, 'html.parser') - company_list = [] - base_url = "https://pitact.com" # 相対URL解決用 - - company_boxes = soup.find_all('div', class_='box cat_01') - if not company_boxes: - warn_msg = "一覧ページで企業情報が見つかりませんでした(要素 '.box.cat_01')。" - log_container.append(warn_msg) - return [], warn_msg - - log_msg = f"{len(company_boxes)} 件の会社情報を検出。" - # log_container.append(log_msg) # ログが多すぎるのでコメントアウト - # print(log_msg) - - for box in company_boxes: - # --- 停止チェック --- - if stop_requested: - log_container.append("停止リクエスト検知 (一覧ページ解析中)。") - break # このページの残りの解析を中断 - - company_data = {"会社名": "取得失敗", "住所": "取得失敗", "Pitact 詳細URL": "取得失敗"} - name_tag_h2 = box.find('h2', class_='list_tit') - if name_tag_h2 and name_tag_h2.a: - company_data["会社名"] = name_tag_h2.a.get_text(strip=True) - detail_url_relative = name_tag_h2.a.get('href') - if detail_url_relative: - company_data["Pitact 詳細URL"] = urljoin(base_url, detail_url_relative) - else: - log_container.append(f"警告: 会社名/詳細URLタグ ('h2.list_tit > a') が見つからない要素がありました。") - - options_div = box.find('div', class_='list_options') - address_found_in_list = False - if options_div: - details = options_div.find_all('dl') - for detail in details: - dt, dd = detail.find('dt'), detail.find('dd') - if dt and dd and "住所" in dt.get_text(strip=True): - address_text = dd.get_text(strip=True) - if address_text.startswith("〒") and len(address_text.split(maxsplit=1)) > 1: - address_text = address_text.split(maxsplit=1)[1].strip() - company_data["住所"] = address_text - address_found_in_list = True - break - - if company_data["Pitact 詳細URL"] != "取得失敗": - company_list.append(company_data) - else: - log_container.append(f"警告: '{company_data.get('会社名', '不明')}' の詳細URLが取得できませんでした。この会社はスキップされます。") - - return company_list, log_message - -def parse_detail_page(html_content, log_container): - """企業詳細ページのHTMLを解析し、必要な情報(ホームページURL、設立日、住所)を抽出する。""" - if not html_content: - error_msg = "詳細ページのHTMLコンテンツが空です。" - log_container.append(error_msg) - return {}, error_msg - log_message = "" - soup = BeautifulSoup(html_content, 'html.parser') - details = {"ホームページURL": "取得失敗", "設立日": "取得失敗", "住所": "取得失敗"} - - info_table = soup.find('table', class_='table is-bordered is-fullwidth responsive-stack') - if not info_table: - warn_msg = "詳細ページで企業基本情報テーブルが見つかりませんでした(要素 'table.table.is-bordered...')。" - log_container.append(warn_msg) - return details, warn_msg - - rows = info_table.find_all('tr') - found_count = 0 - for row in rows: - # --- 停止チェック --- - # 詳細ページの解析は比較的速いので、ここではチェックしない方がスムーズかも - # if stop_requested: - # log_container.append("停止リクエスト検知 (詳細ページ解析中)。") - # break - - th_tag = row.find('th') - td_tag = row.find('td') - - if th_tag and td_tag: - header = th_tag.get_text(strip=True) - value_td = td_tag - - if "ホームページURL" in header: - link_tag = value_td.find('a') - if link_tag and link_tag.has_attr('href') and link_tag['href'].startswith('http'): - details["ホームページURL"] = link_tag['href'] - else: - url_text = value_td.get_text(strip=True) - if url_text and url_text != '--': - details["ホームページURL"] = url_text - else: - details["ホームページURL"] = "取得失敗" - found_count += 1 - - elif "設立日" in header: - value_text = value_td.get_text(strip=True).replace('--','').strip() - details["設立日"] = value_text if value_text else "取得失敗" - found_count += 1 - - elif "住所" in header: - address_text = "" - for content in value_td.contents: - if isinstance(content, str): - address_text += content.strip() - elif content.name == 'br': - address_text += " " - elif content.name == 'a' and 'map' in content.get('href', ''): - pass - address_text = ' '.join(address_text.split()) - if address_text.startswith("〒") and len(address_text.split(maxsplit=1)) > 1: - address_text = address_text.split(maxsplit=1)[1].strip() - details["住所"] = address_text if address_text else "取得失敗" - found_count += 1 - - return details, log_message - -# --- 逐次保存用 CSV書き込み関数 (変更なし) --- -def append_to_csv(data_dict, filename, headers, log_container): + from selenium.webdriver.chrome.service import Service as ChromeService + from webdriver_manager.chrome import ChromeDriverManager + except ImportError: + print("webdriver-manager が見つかりません。 `pip install webdriver-manager` を実行してください。") + ChromeService = None + ChromeDriverManager = None + +# --- 中断フラ��� --- +# スレッドセーフな中断イベントを使用 +interrupt_event = threading.Event() + +# --- Helper Functions --- +def find_prefixed_data_string(data_structure): + """データ構造内から ")]}'\n" で始まる文字列を見つける(再帰的検索)""" + if isinstance(data_structure, str) and data_structure.startswith(")]}'\n"): + return data_structure + elif isinstance(data_structure, list): + for item in data_structure: + if interrupt_event.is_set(): return None # 中断チェック + found = find_prefixed_data_string(item) + if found: + return found + elif isinstance(data_structure, dict): + for value in data_structure.values(): + if interrupt_event.is_set(): return None # 中断チェック + found = find_prefixed_data_string(value) + if found: + return found + return None + +def find_details_data_by_id_or_heuristic(data_list, place_id=None): """ - 辞書データをCSVファイルに追記する。 - ファイルが存在しない場合や空の場合はヘッダーを書き込む。 + JSONデータリストから詳細情報を含む可能性のあるリストを特定する。 + place_idがあればそれを優先し、なければヒューリスティック(住所形式など)で探す。 """ - file_exists = os.path.isfile(filename) - is_empty = not file_exists or os.path.getsize(filename) == 0 - - df_to_append = pd.DataFrame([data_dict]) - for col in headers: - if col not in df_to_append.columns: - df_to_append[col] = "取得失敗" - df_to_append = df_to_append[headers] - - try: - df_to_append.to_csv( - filename, - mode='a', - header=is_empty, - index=False, - encoding='utf-8-sig' - ) - return True, "" - except Exception as e: - error_msg = f"警告: '{data_dict.get('会社名', '不明')}' の情報をCSV '{os.path.basename(filename)}' に追記中にエラー: {e}" - log_container.append(error_msg) - print(error_msg) - return False, error_msg + if not isinstance(data_list, list): + return None + if interrupt_event.is_set(): return None # 中断チェック + + potential_candidates = [] + for item in data_list: + if interrupt_event.is_set(): return None # 中断チェック + # 詳細データは通常、要素数が比較的多いリスト形式 + if not isinstance(item, list) or len(item) < 30: + continue + + is_candidate = False + # place_id が指定されていれば、リスト内にそのIDが含まれるかチェック + if place_id and place_id in str(item): + is_candidate = True + # place_id がない場合は、住所らしき情報が含まれるかヒューリスティックにチェック + elif not place_id: + has_address_like = any( + isinstance(sub, str) and + ("〒" in sub or + any(k in sub for k in ["都", "道", "府", "県", "市", "区", "町", "村", "丁目", "番地", "号"]) or + re.search(r'\d+-\d+-\d+', sub)) + for sub in item + ) + if has_address_like: + is_candidate = True -# --- Google Drive 同期ヘルパー関数 (変更なし) --- -def sync_file_to_drive(local_file_path, gdrive_target_file_path, log_container): - """ - 指定されたローカルファイルをGoogle Driveのターゲットファイルパスにコピーする。 - Colab環境で、Driveがマウントされ、ローカルファイルが存在する場合のみ実行。 - """ - if not IN_COLAB or not drive or not os.path.exists(local_file_path): - return False, "Drive同期の前提条件を満たしていません(非Colab、未マウント、ローカルファイル欠損など)。" + if is_candidate: + potential_candidates.append(item) - try: - gdrive_target_dir_path = os.path.dirname(gdrive_target_file_path) - if not os.path.exists(gdrive_target_dir_path): - log_msg = f"Drive側ディレクトリ作成試行: {gdrive_target_dir_path}" - # log_container.append(log_msg) - # print(log_msg) - os.makedirs(gdrive_target_dir_path, exist_ok=True) - # log_container.append(f"Drive側ディレクトリ '{os.path.basename(gdrive_target_dir_path)}' 作成完了 (または既存)。") - - shutil.copy2(local_file_path, gdrive_target_file_path) - sync_msg = f"-> Drive同期成功: {os.path.basename(local_file_path)} -> {gdrive_target_file_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}" - print(sync_msg) # 同期成功はコンソールのみ表示 - return True, "" - - except Exception as e_sync: - error_msg = f"★★★★★ Google Driveへのファイル同期中にエラー ★★★★★\n" - error_msg += f" ローカルファイル: {local_file_path}\n" - error_msg += f" Driveターゲットパス: {gdrive_target_file_path}\n" - error_msg += f" エラー: {type(e_sync).__name__}: {e_sync}\n" - error_msg += " 警告: このファイルの同期に失敗しました。" - log_container.append(error_msg) - print(error_msg) - return False, error_msg - -# --- 停止リクエスト処理関数 --- -def request_stop(): - """停止フラグを立て、ログにメッセージを表示する""" - global stop_requested - if not stop_requested: # 複数回押されてもログは1回だけ - print("停止リクエストを受け付けました。") - stop_requested = True - # ログ表示エリアにもメッセージを追加したいが、 - # この関数はフラグを立てるだけにして、scrape関数内でログを出す方が良い - # return "停止リクエストを受け付けました。現在のイテレーション完了後に停止を試みます..." - # else: - # return "既に停止リクエスト受付済みです。" - return "停止リクエスト受付済み。処理中断を試みます..." # Gradioボタンのフィードバック用 - -# --- スクレイピング実行関数 (停止チェック追加) --- -def scrape_pitact_gradio(base_url, drive_filename, progress=gr.Progress(track_tqdm=True)): - """ - Gradioインターフェースから呼び出されるスクレイピング処理関数。 - 停止リクエストがあれば中断する。 - """ - global stop_requested - stop_requested = False # 開始時に必ずリセット - - log_output_list = ["処理を開始します..."] - all_company_data_memory = [] - page_num = 1 - total_companies_processed = 0 - driver = None - local_output_path = os.path.join(".", LOCAL_CSV_FILENAME) - gdrive_output_path = None - final_drive_file_ref = None - - # --- 1. Drive マウント --- - drive_mounted, mount_msg = setup_drive_mount(log_output_list) - if IN_COLAB and not drive_mounted: - log_output_list.append("エラー: Google Driveのマウントに失敗したため、処理を中止します。") - return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) - if IN_COLAB and drive_filename: - safe_drive_filename = "".join(c if c.isalnum() or c in ('_', '-', '.') else '_' for c in drive_filename) - if not safe_drive_filename.lower().endswith('.csv'): - safe_drive_filename += '.csv' - gdrive_output_path = os.path.join(DRIVE_MOUNT_POINT, safe_drive_filename) - log_output_list.append(f"Google Drive 保存先: {gdrive_output_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}") - elif IN_COLAB and not drive_filename: - log_output_list.append("警告: Driveファイル名が指定されていません。Driveへの保存は行われません。") - gdrive_output_path = None - else: - gdrive_output_path = None + if not potential_candidates: + return None - # --- 2. 入力URL検証 --- - if not base_url or not base_url.startswith("https://pitact.com/search/"): - log_output_list.append("エラー: 無効なPitactの検索URLです。") - return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) + # 候補が1つならそれを返す + if len(potential_candidates) == 1: + return potential_candidates[0] - log_output_list.append(f"対象URL: {base_url}") - log_output_list.append(f"ローカル保存先: '{os.path.basename(local_output_path)}'") - if gdrive_output_path: - log_output_list.append(f"Drive同期先: '{os.path.basename(gdrive_output_path)}'") + # 候補が複数ある場合、スコアリングで最もそれらしいものを選ぶ + best_candidate = None + max_score = -1 - # --- 3. ローカルCSVファイル初期化 --- + for candidate in potential_candidates: + if interrupt_event.is_set(): return None # 中断チェック + score = len(candidate) # 要素数が多いほど詳細情報の可能性が高い + try: + # 特定のインデックスにリストが存在するか(構造的な特徴) + if any(isinstance(candidate[idx], list) and candidate[idx] for idx in [7, 13, 178] if idx < len(candidate)): + score += 50 + # URLらしき文字列が含まれるか + if 7 < len(candidate) and isinstance(candidate[7], list) and len(candidate[7]) > 0 and isinstance(candidate[7][0], str) and candidate[7][0].startswith('http'): + score += 50 + # 別の構造的な特徴 + if 34 < len(candidate) and isinstance(candidate[34], list) and candidate[34]: + score += 30 + except Exception: + # スコアリング中のエラーは無視 + pass + + if score > max_score: + max_score = score + best_candidate = candidate + + return best_candidate + + +def is_domain_like(text): + """文字列がドメイン名らしい形式か簡易的に判定""" + if not isinstance(text, str): return False + text = text.strip().lower() + common_tlds = ['.com', '.jp', '.co.jp', '.net', '.org', '.info', '.biz'] + # URLスキーマ、パス、特殊文字、全角文字、IPアドレス形式、前後のドット、連続ドットは除外 + if re.search(r'^(https?|ftp)://|[/\\?#\s\u3000-\uFFFF:;@!$%^*()=+]', text): return False + if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', text): return False + if text.startswith('.') or text.endswith('.') or '..' in text: return False + # ドットを含み、一般的なTLDで終わるかチェック + return '.' in text and any(text.endswith(tld) for tld in common_tlds) + +def safe_get(data, index, default=None): + """ネストされたリストや辞書から安全に値を取得する""" + if isinstance(index, int): + try: + return data[index] if isinstance(data, list) and index < len(data) else default + except IndexError: + return default + elif isinstance(index, list): # インデックスのリストでネストされた要素を取得 + current = data + for idx in index: + if interrupt_event.is_set(): return default # 中断チェック + try: + if isinstance(current, list) and isinstance(idx, int) and idx < len(current): + current = current[idx] + elif isinstance(current, dict) and idx in current: + current = current[idx] + else: + return default # 途中でリスト/辞書でない、またはインデックス/キーが存在しない場合 + except (IndexError, KeyError, TypeError): + return default # その他の予期せぬエラー + return current + elif isinstance(index, str): # 文字列インデックスは辞書のキーとして扱う + return data.get(index, default) if isinstance(data, dict) else default + return default + +# --- 中断チェック付き時間待機関数 --- +def interruptible_sleep(duration): + """指定された時間待機するが、中断イベントが発生したら即座に終了する""" + interrupt_event.wait(timeout=duration) + # waitはタイムアウトするかイベントがセットされると戻る + # 呼び出し元で interrupt_event.is_set() をチェックする必要がある + +# --- HTML抽出関数 (本文抽出を span.wiI7pd 優先に変更、中断チェック追加) --- +def extract_details_and_reviews_from_html(html_content): + """詳細HTMLから基本情報と口コミ情報を抽出 (本文は span.wiI7pd 優先、中断チェックあり)""" + print(" [HTML Extractor - Details & Reviews (wiI7pd priority)] 開始") + soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') + details = {"name": "N/A", "url": "", "phone": "N/A", "address": "N/A", "links": {}, "reviews": [], "extraction_error": None} try: - if os.path.exists(local_output_path): - os.remove(local_output_path) - log_output_list.append(f"既存のローカルファイル '{os.path.basename(local_output_path)}' を削除しました。") - pd.DataFrame(columns=CSV_HEADERS).to_csv(local_output_path, index=False, encoding='utf-8-sig') - log_output_list.append(f"ローカル出力ファイル '{os.path.basename(local_output_path)}' を初期化しました。") + # --- 基本情報の抽出 --- + if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") + main_container_selector = '.aIFcqe' + main_container = soup.select_one(main_container_selector) + search_root = soup # デフォルトはページ全体 + if main_container: + print(f" '{main_container_selector}' コンテナ発見。基本情報を抽出。") + search_root = main_container + else: + print(f" 警告: '{main_container_selector}' コンテナが見つかりません。ページ全体から基本情報を抽出。") + + # 名前 (h1タグを探す) + if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") + name_tag = search_root.find('h1') + if name_tag: + details['name'] = name_tag.get_text(strip=True) + elif details['name'] == 'N/A': # フォールバックでから取得 + title_tag = soup.find('title') + if title_tag and title_tag.string: + title_text = title_tag.string.replace('- Google マップ', '').strip() + if title_text.lower() != "google マップ": details["name"] = title_text + + # 電話、住所、ウェブサイトなどの情報を抽出 + selectors_map = { + "phone": ['button[data-item-id^="phone:tel:"]', 'div.Io6YTe', 'button[aria-label*="電話番号"]'], + "address": ['button[data-item-id="address"]', 'div.rogA2c', 'button[aria-label*="住所"]'], + "website": ['a[data-item-id="authority"][href^="http"]', 'button[data-item-id="authority"]', 'a[aria-label*="ウェブサイト"][href^="http"]'], + "other_link": ['a.CsEnBe[href^="http"]'] # 公式サイト以外のリンク + } + + for info_type, selectors in selectors_map.items(): + if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") + found_val = None + for selector in selectors: + if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") + element = search_root.select_one(selector) + # コンテナ内で見つからなければページ全体で再検索 + if not element and search_root != soup: + element = soup.select_one(selector) + + if element: + data_item_id = element.get('data-item-id', '') + aria_label = element.get('aria-label', '') + element_text = element.get_text(strip=True) + href = element.get('href') + + if info_type == "phone": + phone_num = None + if data_item_id.startswith('phone:tel:'): phone_num = data_item_id.split(':')[-1] + elif "電話番号:" in aria_label: phone_num = re.search(r'([\d-]+)', aria_label.split("電話番号:")[-1]) + elif element.name == 'div' and re.match(r'^[\d\s-]+$', element_text): phone_num = element_text + # 電話番号形式の整形と検証 + if isinstance(phone_num, str): phone_num = phone_num.strip() + elif hasattr(phone_num, 'group'): phone_num = phone_num.group(1).strip() + if phone_num and re.match(r'^[\d-]+$', phone_num.replace('ー','-')): + found_val = phone_num.replace('ー','-') + break # 電話番号が見つかったらループ脱出 + elif info_type == "address": + addr_text = None + if data_item_id == 'address': addr_text = element_text + elif "住所:" in aria_label: addr_text = aria_label.split("住所:")[-1].split('(新しいウィンドウで開きます)')[0].strip() + elif element.name == 'div' and ("〒" in element_text or any(k in element_text for k in ["都","道","府","県","市","区","町","村"])): addr_text = element_text + # 住所らしき文字列か簡易チェック + if addr_text and len(addr_text) > 5: # ある程度の長さがあるか + found_val = addr_text + break # 住所が見つかったらループ脱出 + elif info_type == "website" or info_type == "other_link": + if href and href.startswith('http') and 'google.com' not in urlparse(href).netloc: # Google自身のリンクは除外 + link_name = "N/A"; is_website = False + # リンクの種類を判別 + if data_item_id == 'authority' or "ウェブサイト" in aria_label: + link_name = element_text if is_domain_like(element_text) else "ウェブサイト" + is_website = True + elif info_type == "other_link": + link_name = f"リンク ({element_text})" if element_text else "外部リンク" + elif is_domain_like(element_text): # ドメイン名らしきテキストの場合 + link_name = element_text + + if link_name != "N/A": + normalized_url = href.rstrip('/') + # 重複を避けて links 辞書に追加 + if not any(existing_url.rstrip('/') == normalized_url for existing_url in details["links"].values()): + details["links"][link_name] = href + # website タイプで見つかったものを優先的にメインURL候補へ (まだ未設定の場合) + if is_website and details["url"] == "": + details["url"] = href + # website タイプならこのセレクタでの探索は終了 + if info_type == "website": + found_val = href # 見つかったことを示す + break # websiteセレクタのループ脱出 + + # 各タイプの最初の有効な値を details に格納 (other_link は除く) + if found_val and info_type in details and info_type != "other_link": + details[info_type] = found_val + + # メインURLがまだ決まっていない場合、links 辞書から探す + if details["url"] == "": + priority = ["ウェブサイト", "authority"] # 公式サイトらしき名前を優先 + found_url_in_links = False + for p_word in priority: + if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") + for name, url in details["links"].items(): + if p_word in name.lower(): + details["url"] = url + found_url_in_links = True + break + if found_url_in_links: + break + # それでも見つからなければ、ドメイン名らしきリンク > 最初のリンク + if not found_url_in_links: + domain_link = next((url for name, url in details["links"].items() if is_domain_like(name)), None) + if domain_link: + details["url"] = domain_link + elif details["links"]: # linksに何かあれば最初のものをURLとする + details["url"] = next(iter(details["links"].values())) + print(f" 基本情報抽出完了: Name='{details['name']}'") + + + # --- 口コミ情報の抽出 --- + print(" 口コミ情報抽出開始 (span.wiI7pd 優先)...") + review_container_selector = 'div.GHT2ce.NsCY4' + review_container = soup.select_one(review_container_selector) + if review_container: + print(f" '{review_container_selector}' 口コミコンテナ発見。") + # 口コミカードの特定 (jftiEf or MyEned) + review_card_selectors = ['div.jftiEf', 'div.MyEned'] + review_cards = [] + for sel in review_card_selectors: + if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") + review_cards = review_container.select(sel) + if review_cards: + print(f" 口コミカードセレクタ '{sel}' で {len(review_cards)} 件発見。") + break + if not review_cards: + print(" 警告: 口コミコンテナ内で口コミカードが見つかりません。") + + extracted_reviews = [] + for card_idx, card in enumerate(review_cards): + if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") + try: + review_text = "N/A"; reviewer_name = "N/A"; rating = "N/A" + + # 口コミ本文抽出 (span.wiI7pd 優先) + text_span_wiI7pd = card.select_one('span.wiI7pd') + if text_span_wiI7pd: + review_text = text_span_wiI7pd.get_text(strip=True) + else: + # フォールバック: span[jscontroller="MZnM8e"] + full_text_span = card.select_one('span[jscontroller="MZnM8e"]') + if full_text_span: + review_text = full_text_span.get_text(strip=True) + + # 投稿者名 (.d4r55) + name_el = card.select_one('.d4r55'); + if name_el: reviewer_name = name_el.get_text(strip=True) + + # 評価 (.kvMYJc aria-label) + rating_el = card.select_one('.kvMYJc'); + if rating_el: + aria_label = rating_el.get('aria-label', ''); + match = re.search(r'星 (\d+(\.\d+)?)', aria_label) # "星 5.0" などを想定 + if match: rating = match.group(1) + + # 情報が一部でも取れていれば追加 + if review_text != "N/A" or reviewer_name != "N/A": + extracted_reviews.append({"reviewer": reviewer_name, "rating": rating, "text": review_text if review_text != "N/A" else ""}) + + except Exception as e_card: + print(f" 口コミカード {card_idx+1} の解析中にエラー: {e_card}") + extracted_reviews.append({"reviewer": "Error", "rating": "N/A", "text": f"解析エラー: {e_card}"}) + + details['reviews'] = extracted_reviews + print(f" 口コミ抽出完了: {len(details['reviews'])} 件") + else: + print(f" 警告: '{review_container_selector}' 口コミコンテナが見つかりません。") + + except InterruptedError as e_interrupt: # 中断エラーをキャッチ + print(f" HTML解析処理が中断されました: {e_interrupt}") + details['extraction_error'] = "Interrupted" + details['status'] = 'Interrupted' # ステータスも中断にする + except Exception as e_extract: + print(f"★★★★★ HTML抽出処理中にエラーが発生しました ★★★★★") + error_trace = traceback.format_exc() + print(error_trace) + details['extraction_error'] = f"Type: {type(e_extract).__name__}, Msg: {e_extract}\nTrace: {error_trace}" + + print(f" [HTML Extractor - Details & Reviews (wiI7pd priority)] 完了: Name='{details['name']}'") + return details + + +# --- CSV Loading Function (中断チェック追加) --- +def load_queries(csv_path): + """CSVファイルを読み込み、1列目のクエリをリストとして返す(中断チェックあり)""" + queries = [] + encodings_to_try = ['utf-8-sig', 'utf-8', 'cp932', 'shift_jis'] # 試すエンコーディングリスト + file_encoding = None + print(f"CSVファイル読み込み開始: {os.path.basename(csv_path)}") + if not csv_path or not os.path.exists(csv_path): + print("エラー: CSVファイルが見つかりません。") + return [] + + # ファイルのエンコーディングを特定 + for encoding in encodings_to_try: + if interrupt_event.is_set(): print("CSV読み込み中に中断リクエスト検出"); return [] # 中断チェック + try: + with open(csv_path, 'r', encoding=encoding, errors='strict') as f: + f.read(1024) # ファイルの一部を読んでエンコーディングを確認 + file_encoding = encoding + print(f" エンコーディング '{encoding}' で読み込み試行...") + break + except (UnicodeDecodeError, LookupError): + continue # 次のエンコーディングを試す + except Exception as e_enc: + print(f" '{encoding}' 試行中に予期せぬエラー: {e_enc}") + continue + + if not file_encoding: + print(f"エラー: ファイル '{os.path.basename(csv_path)}' を読み込めるエンコーディングが見つかりません。") + return [] + + line_num = 0 + try: + with open(csv_path, 'r', encoding=file_encoding, newline='') as f: + reader = csv.reader(f) + try: + if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック + header = next(reader) # 最初の行を読み込む + line_num += 1 + print(f" 1行目 (ヘッダー可能性あり): {header}") + except StopIteration: + print("情報: CSVファイルが空です。") + return [] # ファイルが空なら終了 + except InterruptedError as e_interrupt: + print(e_interrupt) + return [] + + # 1行目がヘッダーかどうかを判定 (簡易的) + header_keywords = ['query', 'search', 'keyword', 'クエリ', '検索', 'キーワード', '店舗', '会社'] + first_col_header = header[0].strip().lower() if header else "" + is_header = any(hkw in first_col_header for hkw in header_keywords) + + # 1行目がヘッダーでなく、かつ内容があればクエリとして追加 + if not is_header and header and header[0].strip(): + queries.append(header[0].strip()) + elif is_header: + print(" 1行目はヘッダーと判断しスキップします。") + + # 2行目以降を処理 + for row in reader: + if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック + line_num += 1 + # 1列目にデータがあればクエリとして追加 + if row and row[0].strip(): + queries.append(row[0].strip()) + # 1列目が空でも他の列にデータがあれば警告を表示 (スキップ対象) + elif any(cell.strip() for cell in row): + print(f"警告: 行 {line_num} の1列目が空です: {row}。スキップします。") + + print(f" CSVから {len(queries)} 件の有効なクエリを抽出しました。") + except InterruptedError as e_interrupt: # 中断をキャッチ + print(e_interrupt) + print(f"中断リクエストにより、{len(queries)} 件のクエリまで読み込みました。") + return queries # 途中までのクエリを返す except Exception as e: - log_output_list.append(f"エラー: ローカル出力ファイルの初期化/削除に失敗: {e}") - return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) + # CSV処理中のエラーハンドリング + print(f"★★★★★ CSVファイル処理中にエラー (行 {line_num}) ★★★★★") + print(f"エラータイプ: {type(e).__name__}") + print(f"エラーメッセージ: {e}") + print("--- スタックトレース ---") + print(traceback.format_exc()) + print("----------------------") + return [] # エラー発生時は空リストを返す + return queries + + +# --- Single Query Processing Function (中断チェック強化) --- +def process_single_query_full_list(driver, query, query_index, output_dir, wait_config): + """単一クエリ処理: 検索→リストスクロール→リンク抽出→詳細ページ→口コミタブ→口コミスクロール→「もっと見る」クリック→HTML取得→解析 (中断チェックあり)""" + print(f"\n--- クエリ処理開始 [Index:{query_index}] ---: {query}") + results_list = [] + safe_query_part = re.sub(r'[\\/*?:"<>|]', '_', query)[:30].strip() or "empty_query" + base_url = "https://www.google.com/maps/" + + # 待機時間設定 + WAIT_TIME_BASE = wait_config['base'] + WAIT_TIME_DETAIL = wait_config['detail'] + WAIT_TIME_SEARCH = wait_config['search'] + # スクロール設定 + SCROLL_PAUSE_TIME = max(1.5, WAIT_TIME_BASE * 0.5) + MAX_SCROLL_ATTEMPTS = 30 + SCROLL_PAUSE_TIME_REVIEW = max(1.0, WAIT_TIME_BASE * 0.3) + MAX_SCROLL_ATTEMPTS_REVIEW = 500 # 口コミは多い場合があるので回数を増やす + REVIEW_SCROLL_STUCK_LIMIT = 5 # 口コミスクロール停止判定の閾値 - # --- 4. WebDriver起動 --- try: - driver = setup_driver(log_output_list) - if not driver: - log_output_list.append("エラー: WebDriverの起動に失敗しました。処理を中止します。") - return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) - log_output_list.append("WebDriverの起動成功。") - - # --- 5. メインループ (ページ単位) --- - while True: - # --- 停止チェック (ページ処理開始前) --- - if stop_requested: - log_output_list.append("\n===== 停止リクエストにより処理を中断します =====") - break - - page_log = f"\n===== 一覧ページ {page_num} の処理開始 =====" - log_output_list.append(page_log) - print(page_log) - - current_list_url = base_url if page_num == 1 else f"{base_url}/page-{page_num}" - listing_html, fetch_log = get_html_with_driver(driver, current_list_url, log_output_list, wait_time=6) - - # --- 停止チェック (HTML取得後) --- - if stop_requested: - log_output_list.append("\n===== 停止リクエストにより処理を中断します =====") - break - if not listing_html: - log_output_list.append(f"一覧ページ {page_num} の取得に失敗したため、処理を停止します。") - break - - companies_on_page, parse_log = parse_listing_page(listing_html, log_output_list) - - # --- 停止チェック (一覧解析後) --- - if stop_requested: - log_output_list.append("\n===== 停止リクエストにより処理を中断します =====") - break - if not companies_on_page: - if page_num == 1: - log_output_list.append(f"一覧ページ {page_num} で会社が見つかりませんでした。URL誤りかサイト構造変更の可能性があります。") + # --- 中断チェック --- + if interrupt_event.is_set(): raise InterruptedError("処理開始前に中断リクエスト") + + # 1. 検索実行とリスト表示待機 + search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" + print(f" URLにアクセス: {search_url}") + driver.get(search_url) + if interrupt_event.is_set(): raise InterruptedError("ページ読み込み後に中断リクエスト") + print(f" 検索結果リスト表示待機 (最大{WAIT_TIME_SEARCH}秒)...") + list_container_selector = 'div[role="feed"], div[aria-label*="の検索結果"]' + try: + # WebDriverWait も中断可能にするのは難しいので、ここではそのまま + list_container = WebDriverWait(driver, WAIT_TIME_SEARCH).until( + EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) + ) + WebDriverWait(driver, 10).until( + EC.visibility_of_element_located((By.CSS_SELECTOR, f'{list_container_selector} a[href*="/maps/place/"]')) + ) + print(" 検索結果リスト表示を確認。") + except TimeoutException as e_timeout: + print(f" エラー: 検索結果リストの表示タイムアウト。URL: {search_url}\n{e_timeout}") + print("--- HTML Snapshot (Timeout) ---") + try: print(driver.page_source[:1000]) + except: print(" ページソース取得失敗") + print("--- End Snapshot ---") + results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Timeout)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Timeout'}) + return results_list + except Exception as e_wait: + print(f"★★★★★ リスト待機中に予期せぬエラー ★★★★★\nURL: {search_url}\n{type(e_wait).__name__}: {e_wait}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Wait Exception)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Wait Exception'}) + return results_list + + # 2. 検索リストのスクロール + print(" 検索リストをスクロールして全結果を表示...") + last_height = driver.execute_script("return arguments[0].scrollHeight", list_container) + scroll_attempts = 0 + stuck_count = 0 + while scroll_attempts < MAX_SCROLL_ATTEMPTS: + if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 中断チェック + try: + driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', list_container) + interruptible_sleep(SCROLL_PAUSE_TIME) # 中断可能な待機 + if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック + + new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) + end_markers = driver.find_elements(By.XPATH, "//span[contains(text(), '結果は以上です')] | //p[contains(text(), '結果は以上です')]") + if any(el.is_displayed() for el in end_markers): + print(" 「結果は以上です」表示確認。検索リストスクロール終了。") + break + if new_height == last_height: + stuck_count += 1 + print(f" 検索リストスクロール高さ変化なし ({stuck_count}回目)。再試行...") + interruptible_sleep(SCROLL_PAUSE_TIME * 1.5) # 中断可能な待機 + if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック + new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) + if new_height == last_height and stuck_count >= 3: + print(" 高さ変化なしが続いたため、検索リストスクロール終了と判断。") + break else: - log_output_list.append(f"一覧ページ {page_num} で会社が見つかりませんでした。最終ページのようです。") - break - - log_output_list.append(f"一覧ページ {page_num} で {len(companies_on_page)} 件検出。詳細情報取得...") - print(f"一覧ページ {page_num} で {len(companies_on_page)} 件検出。詳細情報取得...") - - # --- 6. 会社詳細ループ (ページ内の会社ごと) --- - desc = f"ページ {page_num}: 詳細取得&保存中" - # tqdm自体のキャンセルは難しいので、ループ内でチェック - for i, company_base_info in enumerate(companies_on_page): - # --- 停止チェック (各会社の処理開始前) --- - if stop_requested: - log_output_list.append("\n===== 停止リクエストにより現在のページの処理を中断します =====") - break # このページの残りの会社の処理を中断 - - total_companies_processed += 1 - company_log = f"\n--- 会社 #{total_companies_processed}: {company_base_info.get('会社名', '名前不明')} ---" - print(company_log) # ログリストには追加しない - - detail_url = company_base_info.get("Pitact 詳細URL") - - if detail_url and detail_url != "取得失敗": - detail_html, detail_fetch_log = get_html_with_driver(driver, detail_url, log_output_list, wait_time=3) - - # --- 停止チェック (詳細HTML���得後) --- - if stop_requested: break - - if detail_html: - extracted_details, detail_parse_log = parse_detail_page(detail_html, log_output_list) - # --- 停止チェック (詳細解析後) --- - if stop_requested: break - - if "住所" in extracted_details and extracted_details["住所"] != "取得失敗": - company_base_info["住所"] = extracted_details["住所"] - if "ホームページURL" in extracted_details: - company_base_info["ホームページURL"] = extracted_details["ホームページURL"] - if "設立日" in extracted_details: - company_base_info["設立日"] = extracted_details["設立日"] + stuck_count = 0 + last_height = new_height + except Exception as e_scroll: + if interrupt_event.is_set(): raise InterruptedError("検索リストスクロールエラー処理中に中断リクエスト") # エラー処理中もチェック + print(f"★★★★★ 検索リストスクロール中にエラー ★★★★★\n{type(e_scroll).__name__}: {e_scroll}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + print(" スクロールエラー発生。可能な範囲で続行します。") + scroll_attempts += 1 + if scroll_attempts >= MAX_SCROLL_ATTEMPTS: + print(f" 検索リスト最大スクロール回数 ({MAX_SCROLL_ATTEMPTS}) 到達。") + + # 3. リンク抽出 + if interrupt_event.is_set(): raise InterruptedError("リンク抽出前に中断リクエスト") # 中断チェック + print(" 検索結果リストからリンクを抽出...") + unique_place_links = set() + result_card_selector = '.hfpxzc' + try: + list_container_updated = WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) + ) + result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) + print(f" '{result_card_selector}' 要素を {len(result_cards)} 件発見。") + + if not result_cards: + print(f" 警告: '{result_card_selector}' が見つかりません。代替セレクタ 'a.hfpxzc' で試行...") + result_card_selector = 'a.hfpxzc' + result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) + print(f" 代替セレクタで {len(result_cards)} 件発見。") + if not result_cards: + print(f" 警告: 代替セレクタ 'a.Nv2PK' で試行...") + result_card_selector = 'a.Nv2PK' + result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) + print(f" 代替セレクタで {len(result_cards)} 件発見。") + + link_extraction_errors = 0 + for card_idx, card in enumerate(result_cards): + if interrupt_event.is_set(): raise InterruptedError("リンク抽出ループ中に中断リクエスト") # 中断チェック + try: + link_element = None + if card.tag_name == 'a': link_element = card else: - log_output_list.append(f"警告: {company_base_info.get('会社名', '不明')} 詳細ページ取得失敗。") - company_base_info.setdefault("ホームページURL", "取得失敗") - company_base_info.setdefault("設立日", "取得失敗") + try: link_element = card.find_element(By.TAG_NAME, 'a') + except NoSuchElementException: continue + + if link_element: + href = link_element.get_attribute('href') + if href and "/maps/place/" in href and not href.startswith("javascript:"): + absolute_href = urljoin(base_url, href) + unique_place_links.add(absolute_href) + except StaleElementReferenceException: + link_extraction_errors += 1 + continue + except Exception as e_extract_link: + print(f"★★★★★ カード {card_idx+1} からのリンク抽出エラー ★★★★★\n{type(e_extract_link).__name__}: {e_extract_link}") + link_extraction_errors += 1 + if link_extraction_errors > 0: + print(f" リンク抽出中に {link_extraction_errors} 件のエラーが発生しました。") + print(f" 抽出したユニークリンク数: {len(unique_place_links)}") + except Exception as e_find_links: + print(f"★★★★★ リンク抽出プロセス全体でエラー ★★★★★\n使用したセレクタ: '{result_card_selector}'\n{type(e_find_links).__name__}: {e_find_links}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': f'Error (Link Extraction Fail)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Link Extraction Fail'}) + return results_list + + if not unique_place_links: + print(" 有効な詳細ページリンクが見つかりませんでした。このクエリの結果はありません。") + results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': 'No Results Found', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Success: No Results'}) + return results_list + + # 4. 各リンクの詳細ページを処理 + print(f" {len(unique_place_links)} 件の詳細情報を取得...") + link_list = sorted(list(unique_place_links)) + processed_urls = set() + + for i, place_url in enumerate(link_list, 1): + if interrupt_event.is_set(): raise InterruptedError("詳細ページ処理ループ開始前に中断リクエスト") # 中断チェック + if place_url in processed_urls: continue + processed_urls.add(place_url) + + print(f"\n --- 詳細取得 [Query:{query_index}, Result:{i}/{len(link_list)}] ---") + result_details = {'query_index': query_index, 'original_query': query, 'result_rank': i, 'place_url': place_url, 'html_filename': 'N/A', 'name': 'N/A', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Pending', 'extraction_error': None} - time.sleep(0.4) + try: + print(f" 詳細ページに遷移: {place_url}") + driver.get(place_url) + if interrupt_event.is_set(): raise InterruptedError("詳細ページ読み込み後に中断リクエスト") + WebDriverWait(driver, WAIT_TIME_DETAIL).until( + EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1')) + ) + interruptible_sleep(WAIT_TIME_BASE * 0.2) # 中断可能な待機 + if interrupt_event.is_set(): raise InterruptedError("詳細ページ待機後に中断リクエスト") + + # --- 口コミタブをクリック --- + review_tab_text = "クチコミ" + review_tab_xpath = f"//button[@role='tab'][contains(., '{review_tab_text}') or contains(@aria-label, '{review_tab_text}')]" + review_tab_clicked = False + review_scroll_element = None + try: + print(f" {review_tab_text}タブ クリック試行...") + review_tab = WebDriverWait(driver, 10).until( + EC.element_to_be_clickable((By.XPATH, review_tab_xpath)) + ) + driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", review_tab) + interruptible_sleep(0.3) + if interrupt_event.is_set(): raise InterruptedError("口コミタブクリック前に中断リクエスト") + driver.execute_script("arguments[0].click();", review_tab) + review_tab_clicked = True + print(f" {review_tab_text}タブをクリックしました。口コミコンテナ表示待機...") + + review_container_selector = 'div.GHT2ce.NsCY4' + first_review_card_selector = f'{review_container_selector} div.jftiEf:first-of-type, {review_container_selector} div.MyEned:first-of-type' + + review_scroll_element = WebDriverWait(driver, WAIT_TIME_DETAIL).until( + EC.visibility_of_element_located((By.CSS_SELECTOR, review_container_selector)) + ) + WebDriverWait(driver, 5).until( + EC.visibility_of_element_located((By.CSS_SELECTOR, first_review_card_selector)) + ) + print(f" 口コミコンテナ表示確認、スクロール要素取得。") + interruptible_sleep(WAIT_TIME_BASE * 0.5) + if interrupt_event.is_set(): raise InterruptedError("口コミコンテナ待機後に中断リクエスト") + + except TimeoutException: print(f" 警告: {review_tab_text}タブまたは口コミコンテナの表示タイムアウト。") + except ElementClickInterceptedException: print(f" 警告: {review_tab_text}タブのクリックが遮られました。") + except NoSuchElementException: print(f" 警告: {review_tab_text}タブが見つかりません。") + except Exception as e_click_review: print(f"★★★★★ {review_tab_text}タブ処理中に予期せぬエラー ★★★★★\n{type(e_click_review).__name__}: {e_click_review}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + + # --- 口コミエリアのスクロール処理 --- + if review_scroll_element: + print(" 口コミエリアをスクロールして全件表示試行...") + review_last_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) + review_scroll_attempts = 0 + review_stuck_count = 0 + while review_scroll_attempts < MAX_SCROLL_ATTEMPTS_REVIEW: + if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 中断チェック + try: + driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', review_scroll_element) + interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW) # 中断可能な待機 + if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック + + review_new_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) + if review_new_height == review_last_height: + review_stuck_count += 1 + if review_stuck_count >= REVIEW_SCROLL_STUCK_LIMIT: + print(f" 口コミスクロール高さが{REVIEW_SCROLL_STUCK_LIMIT}回変化なし。スクロール終了と判断。") + break + else: + interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW * 2) # 中断可能な待機 + if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック + else: + review_stuck_count = 0 + review_last_height = review_new_height + except Exception as e_review_scroll: + if interrupt_event.is_set(): raise InterruptedError("口コミスクロールエラー処理中に中断リクエスト") + print(f"★★★★★ 口コミスクロール中にエラー ★★★★★\n{type(e_review_scroll).__name__}: {e_review_scroll}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + print(" 口コミスクロールエラー発生。可能な範囲で続行します。") + break + review_scroll_attempts += 1 + if review_scroll_attempts >= MAX_SCROLL_ATTEMPTS_REVIEW: + print(f" 最大口コミスクロール回数 ({MAX_SCROLL_ATTEMPTS_REVIEW}) 到達。") + print(" 口コミエリアのスクロール完了。") + elif review_tab_clicked: print(" 警告: 口コミスクロール要素が見つからなかったため、口コミスクロールをスキップします。") + + # --- 「もっと見る」ボタンをクリック --- + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック前に中断リクエスト") + if review_tab_clicked and review_scroll_element: + print(" 「もっと見る」ボタンを検索してクリック試行...") + more_buttons_xpath = "//button[contains(text(), 'もっと見る')]" + clicked_count = 0 + click_attempts = 0 + max_click_attempts = 3 + while click_attempts < max_click_attempts: + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」ループ中に中断リクエスト") # 中断チェック + buttons_found_this_round = 0 + try: + more_buttons = driver.find_elements(By.XPATH, more_buttons_xpath) + if not more_buttons: + if click_attempts == 0: print(" 「もっと見る」ボタンが見つかりませんでした。") + else: print(f" 追加の「もっと見る」ボタンは見つかりませんでした (試行 {click_attempts+1}/{max_click_attempts})。") + break + + print(f" 「もっと見る」ボタンを {len(more_buttons)} 個発見 (試行 {click_attempts+1}/{max_click_attempts})。クリック開始...") + for btn_idx, button in enumerate(more_buttons): + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") # 中断チェック + try: + if button.is_displayed() and button.is_enabled(): + driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", button) + interruptible_sleep(0.2) + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") + driver.execute_script("arguments[0].click();", button) + clicked_count += 1 + buttons_found_this_round += 1 + interruptible_sleep(0.3) + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") + except ElementClickInterceptedException: print(f" ボタン {btn_idx+1} のクリックが遮られました。スキップします。") + except StaleElementReferenceException: print(f" ボタン {btn_idx+1} が古くなりました。スキップします。") + except Exception as e_click_more: print(f" ボタン {btn_idx+1} のクリック中にエラー: {e_click_more}") + + print(f" 今回の試行で {buttons_found_this_round} 個の「もっと見る」ボタンをクリックしました。") + if buttons_found_this_round == 0: + print(" これ以上クリックできる「もっと見る」ボタンはありませんでした。") + break + + except Exception as e_find_more: + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」検索エラー処理中に中断リクエスト") + print(f"★★★★★ 「もっと見る」ボタン検索中にエラー ★★★★★\n{type(e_find_more).__name__}: {e_find_more}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + break + click_attempts += 1 + if click_attempts < max_click_attempts: + interruptible_sleep(1.0) + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」試行間待機中に中断リクエスト") + + if clicked_count > 0: print(f" 合計 {clicked_count} 個の「もっと見る」ボタンをクリックしました。") + else: print(" クリックされた「もっと見る」ボタンはありませんでした。") + interruptible_sleep(WAIT_TIME_BASE * 0.5) + if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック後に中断リクエスト") + + # --- HTML取得と保存 --- + print(" ページのHTMLを取得・保存中...") + detail_html_content = "" + try: + if interrupt_event.is_set(): raise InterruptedError("HTML取得前に中断リクエスト") + detail_html_content = driver.page_source + temp_name = 'N/A' + try: temp_name = driver.find_element(By.TAG_NAME, 'h1').text + except: pass + safe_place_name_part = re.sub(r'[\\/*?:"<>|]', '_', temp_name)[:20].strip() or "no_name" + tab_suffix = "_reviews_expanded" if review_tab_clicked else "_overview" + detail_html_fname = f"Q{query_index:03d}_R{i:03d}_{safe_place_name_part}_{safe_query_part}_detail{tab_suffix}.html" + detail_html_path = os.path.join(output_dir, detail_html_fname) + with open(detail_html_path, 'w', encoding='utf-8') as f: + f.write(detail_html_content) + result_details['html_filename'] = detail_html_fname + print(f" HTMLを保存しました: {detail_html_fname}") + except Exception as e_save_html: + print(f" HTML取得/保存エラー: {e_save_html}") + result_details['html_filename'] = 'Error Saving HTML' + + # --- HTML解析 --- + if detail_html_content: + print(" HTMLを解析して���報を抽出中...") + if interrupt_event.is_set(): raise InterruptedError("HTML解析前に中断リクエスト") + extracted_info = extract_details_and_reviews_from_html(detail_html_content) + result_details.update(extracted_info) + # 抽出関数内で中断された場合、ステータスが'Interrupted'になっているはず + if result_details.get('status') != 'Interrupted': + if result_details.get('extraction_error'): + result_details['status'] = f"Warning: HTML Extraction Error" + else: + result_details['status'] = 'Success' + print(" HTML解析完了。") else: - log_output_list.append(f"警告: {company_base_info.get('会社名', '不明')} 詳細URL不明。") - company_base_info.setdefault("ホームページURL", "取得失敗") - company_base_info.setdefault("設立日", "取得失敗") - - # --- 停止チェック (CSV保存前) --- - if stop_requested: break - - # --- 7. ローカルCSVへ追記 --- - success_csv, csv_log = append_to_csv(company_base_info, local_output_path, CSV_HEADERS, log_output_list) - - # --- 8. Google Driveへ同期 --- - if success_csv and gdrive_output_path: - # --- 停止チェック (Drive同期前) --- - if stop_requested: break - success_sync, sync_log = sync_file_to_drive(local_output_path, gdrive_output_path, log_output_list) - # 同期失敗時のログは sync_file_to_drive 内で追加される - - # メモリ上のリストにも追加 - all_company_data_memory.append(company_base_info) - - # 内側のループがbreakされた場合、外側のループもbreakする - if stop_requested: - break - - page_num += 1 - time.sleep(0.8) - - except Exception as e: - error_detail = traceback.format_exc() - log_output_list.append("\n!!!処理中に予期せぬエラーが発生しました!!!") - log_output_list.append(f"エラータイプ: {type(e).__name__}") - log_output_list.append(f"エラーメッセージ: {e}") - log_output_list.append("詳細:") - log_output_list.append(error_detail) - print("!!!処理中に予期せぬエラーが発生しました!!!") - print(error_detail) + print(" エラー: HTMLコンテンツが空のため、情報抽出をスキップします。") + result_details['status'] = 'Error: Empty HTML Content' + + except TimeoutException as e_timeout_detail: + print(f"★★★★★ 詳細ページ読み込みタイムアウト ★★★★★\nURL: {place_url}\n{e_timeout_detail}") + print("--- HTML Snapshot (Timeout) ---") + try: print(driver.page_source[:1000]) + except: print(" ページソース取得失敗") + print("--- End Snapshot ---") + result_details['status'] = f'Error: Detail Page Timeout'; result_details['name'] = f"Error (Timeout R:{i})" + except NoSuchElementException as e_nse: + print(f"★★★★★ 詳細ページで必須要素(h1など)が見つかりません ★★★★★\nURL: {place_url}\n{e_nse}") + print("--- HTML Snapshot (NSE) ---") + try: print(driver.page_source[:1000]) + except: print(" ページソース取得失敗") + print("--- End Snapshot ---") + result_details['status'] = f'Error: Detail Page Missing Element (e.g., h1)'; result_details['name'] = f"Error (ElementNotFound R:{i})" + except Exception as e_detail: + if interrupt_event.is_set(): raise InterruptedError("詳細ページ例外処理中に中断リクエスト") # 例外処理中もチェック + print(f"★★★★★ 詳細ページ処理中に予期せぬエラー ★★★★★\nURL: {place_url}\n{type(e_detail).__name__}: {e_detail}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + result_details['status'] = f'Error: Detail Page Exception - {type(e_detail).__name__}'; result_details['name'] = f"Error (Exception R:{i})" + finally: + # 中断された場合、ステータスを上書き + if interrupt_event.is_set() and result_details.get('status') != 'Interrupted': + result_details['status'] = 'Interrupted' + results_list.append(result_details) + + except InterruptedError as e_interrupt: # クエリ処理全体で中断をキャッチ + print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理中に中断リクエスト: {e_interrupt} ★★★★★") + # 中断されたことを示す結果を追加 + results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 'N/A', 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Interrupted Query {query_index}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Interrupted'}) + # ★重要★ 中断例外を再度発生させ、run_scraping関数に中断を伝える + raise e_interrupt + except Exception as e_main_query: + print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理全体でエラー ★★★★★\n{type(e_main_query).__name__}: {e_main_query}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") + results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Error (Overall Query {query_index})', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Query Level Exception - {type(e_main_query).__name__}'}) finally: - if driver: - try: - driver.quit() - log_output_list.append("\n===== WebDriverを終了しました =====") - print("\n===== WebDriverを終了しました =====") - except Exception as e_quit: - log_output_list.append(f"WebDriver終了時にエラーが発生しました: {e_quit}") - print(f"WebDriver終了時にエラーが発生しました: {e_quit}") - - # --- 9. 最終処理と結果返却 --- - if stop_requested: - log_output_list.append("===== 処理はユーザーリクエストにより中断されました =====") + status_msg = "中断" if interrupt_event.is_set() else "完了" + print(f"--- クエリ処理{status_msg} [Index:{query_index}] - {len(results_list)} 件の結果 ---") + return results_list + +# --- 中断リクエスト用関数 --- +def request_interrupt(): + """中断フラグをセットする""" + if not interrupt_event.is_set(): + print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + print("!!! 中断リクエストを受け付けました。 !!!") + print("!!! 現在の処理が完了次第、停止します... !!!") + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") + interrupt_event.set() else: - log_output_list.append("\n===== 処理完了 =====") - - final_df = pd.DataFrame() - local_file_ref = None - drive_file_ref = None # DriveファイルはGradio Fileで返さない - - if all_company_data_memory: - processed_count = len(all_company_data_memory) - log_output_list.append(f"合計 {processed_count} 件の会社データを収集し、") - log_output_list.append(f"ローカル '{os.path.basename(local_output_path)}' に保存しました。") - if gdrive_output_path and os.path.exists(gdrive_output_path): - log_output_list.append(f"Google Drive '{gdrive_output_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}' にも同期されました。") - elif gdrive_output_path: - log_output_list.append(f"Google Drive '{os.path.basename(gdrive_output_path)}' への最終同期に失敗または中断された可能性があります。") - - df = pd.DataFrame(all_company_data_memory) - display_columns = ["会社名", "住所", "ホームページURL", "設立日"] - final_df = pd.DataFrame(columns=display_columns) - for col in display_columns: - if col in df.columns: - final_df[col] = df[col] + print("\n--- 中断は既にリクエストされています ---") + # GradioのTextboxに即時反映させるため、ダミーの値を返す + # (clickイベントのoutputsにTextboxを指定する必要があるため) + # 実際にはログは run_scraping 内で更新される + return "[中断リクエスト受信]" + +# --- Gradio Processing Function (中断処理対応) --- +def run_scraping(input_csv_file, output_dir_name, output_csv_name, csv_encoding, + wait_time_base, wait_time_detail, wait_time_search, headless_mode, progress=gr.Progress()): + """Gradioインターフェースから呼び出されるメイン処理関数(中断機能付き)""" + log_stream = io.StringIO() # ログ出力用 + start_time_total = time.time() # 全体処理時間計測開始 + driver = None # WebDriverオブジェクト初期化 + processed_query_count = 0 # 処理済みクエリ数 + total_results_count = 0 # CSV書き込み総行数 + total_queries = 0 # 総クエリ数 + output_csv_path = None # 出力CSVファイルパス + interrupted_flag = False # 処理が中断されたかを示すフラグ + + # --- 中断フラグをリセット --- + interrupt_event.clear() + print("中断フラグをリセットしました。", file=log_stream) + + # 標準出力と標準エラー出力をログストリームにリダイレクト + with contextlib.redirect_stdout(log_stream), contextlib.redirect_stderr(log_stream): + try: + print("=== 処理開始 ===") + print(f"開始時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + # 入力ファイルチェック + if input_csv_file is None: + print("エラー: CSVファイルが選択されていません。処理を中断します。") + yield log_stream.getvalue(), None # ログと空の結果を返す + return + yield log_stream.getvalue(), None # 初期ログをUIに反映 + + # パラメータ設定 + SEARCH_QUERIES_CSV_PATH = input_csv_file.name + OUTPUT_DIR = output_dir_name.strip() or "html_reviews_expanded" + OUTPUT_CSV_FILENAME = output_csv_name.strip() or "結果_reviews_expanded.csv" + CSV_ENCODING = csv_encoding + try: + wait_config = { + 'base': max(1.0, float(wait_time_base)), + 'detail': max(10.0, float(wait_time_detail)), + 'search': max(5.0, float(wait_time_search)) + } + except ValueError: + print("警告: 待機時間に無効な値が入力されました。デフォルト値を使用します。") + wait_config = {'base': 4.0, 'detail': 20.0, 'search': 15.0} + print(f"待機時間設定: 基本={wait_config['base']}秒, 詳細/口コミ={wait_config['detail']}秒, 検索={wait_config['search']}秒") + yield log_stream.getvalue(), None + + # 出力ディレクトリ設定と作成 + if not os.path.isabs(OUTPUT_DIR): + OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR) + output_csv_path = os.path.join(OUTPUT_DIR, OUTPUT_CSV_FILENAME) + print(f"HTML出力先ディレクトリ: {OUTPUT_DIR}") + print(f"CSV出力先ファイル: {output_csv_path}") + os.makedirs(OUTPUT_DIR, exist_ok=True) + yield log_stream.getvalue(), None + + # CSVからクエリ読み込み (中断チェックあり) + queries = load_queries(SEARCH_QUERIES_CSV_PATH) + yield log_stream.getvalue(), None + if interrupt_event.is_set(): # 読み込み中に中断されたかチェック + print("CSV読み込み中に中断されたため、処理を終了します。") + interrupted_flag = True + raise InterruptedError("CSV loading interrupted") # 処理を中断フローへ + if not queries: + print("エラー: CSVから処理可能なクエリが見つかりませんでした。処理を終了します。") + yield log_stream.getvalue(), None + return + total_queries = len(queries) + print(f"{total_queries} 件のクエリを処理します。") + yield log_stream.getvalue(), None + + # --- 中断チェック --- + if interrupt_event.is_set(): raise InterruptedError("WebDriver初期化前に中断リクエスト") + + # WebDriver初期化 + progress(0, desc="WebDriver初期化中...") + print("\nWebDriver初期化中...") + yield log_stream.getvalue(), None + options = Options() + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + options.add_argument('--lang=ja-JP') + options.add_argument("--window-size=1920,1080") + options.add_argument('--disable-extensions') + options.add_argument('--disable-blink-features=AutomationControlled') + options.add_argument('--disable-gpu') + options.add_experimental_option('excludeSwitches', ['enable-automation']) + options.add_experimental_option('useAutomationExtension', False) + options.add_experimental_option("prefs", { + "credentials_enable_service": False, + "profile.password_manager_enabled": False + }) + if headless_mode: + print(" ヘッドレスモードで実行します。") + options.add_argument('--headless=new') else: - final_df[col] = "取得失敗" - - print(f"合計 {processed_count} 件の会社データを収集完了。") + print(" 通常モード (非ヘッドレス) で実行します。") - if os.path.exists(local_output_path): - local_file_ref = local_output_path - if gdrive_output_path and os.path.exists(gdrive_output_path): - log_output_list.append(f"DriveファイルはGoogle Driveから直接ダウンロードしてください: {gdrive_output_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}") - # drive_file_ref は None のまま - - elif os.path.exists(local_output_path): - log_output_list.append(f"会社データは収集されませんでしたが、ローカルファイル '{os.path.basename(local_output_path)}' は作成/初期化されました。") - print(f"会社データは収集されませんでしたが、ローカルファイル '{os.path.basename(local_output_path)}' は作成/初期化されました。") - local_file_ref = local_output_path - else: - log_output_list.append("会社データは収集されず、ローカルファイルも作成されませんでした。") - print("会社データは収集されず、ローカルファイルも作成されませんでした。") + try: + if IN_COLAB and gs: + print(" Colab環境でgoogle_colab_seleniumを使用します。") + driver = gs.Chrome(options=options) + elif not IN_COLAB and ChromeService and ChromeDriverManager: + try: + print(" webdriver-managerを使用してChromeDriverパスを解決します...") + service = ChromeService(ChromeDriverManager().install()) + driver = webdriver.Chrome(service=service, options=options) + print(" ChromeDriver (webdriver-manager) 起動成功。") + except Exception as e_wdm: + print(f" webdriver-managerでの初期化エラー: {e_wdm}") + print(" PATH上のChromeDriverで試行します...") + driver = webdriver.Chrome(options=options) + print(" ChromeDriver (PATH) 起動成功。") + elif not IN_COLAB: + print(" PATH上のChromeDriverを使用します...") + driver = webdriver.Chrome(options=options) + print(" ChromeDriver (PATH) 起動成功。") + else: + raise Exception("WebDriverを初期化できませんでした。適切なWebDriver設定が見つかりません。") + + driver.implicitly_wait(3) + print("WebDriver初期化完了。") + except Exception as e_wd_init: + print(f"★★★★★ WebDriver初期化失敗 ★★★★★") + print(f"エラータイプ: {type(e_wd_init).__name__}") + print(f"エラーメッセージ: {e_wd_init}") + print("--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") + print("ヒント: ChromeDriverのバージョンとChromeブラウザのバージョンが一致しているか確認してください。") + if not IN_COLAB: print(" `webdriver-manager`がインストールされていない場合は `pip install webdriver-manager` を試してください。") + yield log_stream.getvalue(), None + return + yield log_stream.getvalue(), None + + # --- 中断チェック --- + if interrupt_event.is_set(): raise InterruptedError("CSV処理開始前に中断リクエスト") + + csv_header = ['QueryIndex', 'OriginalQuery', 'ResultRank', 'Status', 'ExtractedName', + 'ExtractedWebsite', 'ExtractedPhone', 'ExtractedAddress', 'Reviews', + 'ExtractionError', 'PlaceURL', 'DetailHTMLFilename'] + file_exists = os.path.exists(output_csv_path) + file_mode = 'a' if file_exists and os.path.getsize(output_csv_path) > 0 else 'w' + print(f"CSVファイルを '{file_mode}' モードで開きます (エンコーディング: {CSV_ENCODING})。") + yield log_stream.getvalue(), None - final_log_output = "\n".join(log_output_list) - # 戻り値: DataFrame, ローカルファイルパス(or None), Driveファイルパス(常にNone), ログ文字列 - return final_df, local_file_ref, None, final_log_output + try: + with open(output_csv_path, file_mode, newline='', encoding=CSV_ENCODING, errors='replace') as csv_file: + writer = csv.writer(csv_file) + if file_mode == 'w': + print(" 新規CSVファイルのためヘッダー行を書き込みます。") + writer.writerow(csv_header) + csv_file.flush() + elif file_exists: + print(f" 既存ファイル '{os.path.basename(output_csv_path)}' に追記します。") + + for i, query in enumerate(queries, 1): + # --- ループ開始時に中断チェック --- + if interrupt_event.is_set(): + print(f"\n===== クエリ {i}/{total_queries} の処理開始前に中断リクエストを検出 =====") + interrupted_flag = True + break # ループを抜ける + + progress(i / total_queries, desc=f"クエリ {i}/{total_queries} 処理中: {query[:30]}...") + start_time_query = time.time() + print(f"\n===== クエリ {i}/{total_queries} 開始: '{query}' =====") + yield log_stream.getvalue(), None + + results = [] + try: + # --- 単一クエリのスクレイピング処理実行 (中断例外をキャッチ) --- + results = process_single_query_full_list(driver, query, i, OUTPUT_DIR, wait_config) + except InterruptedError as e_interrupt_query: + print(f"クエリ {i} の処理が中断されました: {e_interrupt_query}") + interrupted_flag = True # メインループに中断を伝える + # results には中断時点までの結果が入っている可能性がある + if not any(r['status'] == 'Interrupted' for r in results): + # results に中断を示すものがなければ追加 + results.append({'query_index': i, 'original_query': query, 'result_rank': 'N/A', 'status': 'Interrupted', 'name': f'Interrupted Query {i}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'extraction_error': str(e_interrupt_query), 'place_url': 'N/A', 'html_filename': 'N/A'}) + + yield log_stream.getvalue(), None + + # --- 取得結果をCSVに書き込み --- + written_count_query = 0 + print(f" クエリ {i} の結果をCSVに書き込み中...") + for result_data in results: + try: + reviews_list = result_data.get('reviews', []) + formatted_reviews = "" + if isinstance(reviews_list, list) and reviews_list: + review_texts = [] + for idx, review_item in enumerate(reviews_list): + if isinstance(review_item, dict): + r_text = str(review_item.get('text', '')).replace('\n', ' ').replace('\r', '') + review_texts.append(f"[{idx+1}] 投稿者: {review_item.get('reviewer', 'N/A')} | 評価: {review_item.get('rating', 'N/A')} | 本文: {r_text}") + elif isinstance(review_item, str): + review_texts.append(f"[{idx+1}] {review_item.replace('n', ' ').replace('r', '')}") + formatted_reviews = "\n\n".join(review_texts) + elif isinstance(reviews_list, str): + formatted_reviews = reviews_list.replace('\n', ' ').replace('\r', '') + + extraction_error_msg = result_data.get('extraction_error', '') + if extraction_error_msg and len(extraction_error_msg) > 500: + extraction_error_msg = extraction_error_msg[:250] + "..." + extraction_error_msg[-250:] + + row_data = [ + result_data.get('query_index', i), result_data.get('original_query', query), + result_data.get('result_rank', 'N/A'), result_data.get('status', 'Unknown'), + result_data.get('name', 'N/A'), result_data.get('url', ''), + result_data.get('phone', 'N/A'), result_data.get('address', 'N/A'), + formatted_reviews, extraction_error_msg, + result_data.get('place_url', 'N/A'), result_data.get('html_filename', 'N/A') + ] + writer.writerow(row_data) + written_count_query += 1 + except Exception as e_write: + print(f"★★★★★ CSV書き込み中にエラーが発生しました (行スキップ) ★★★★★") + print(f"エラーデータ (一部): {str(result_data)[:200]}...") + print(f"エラータイプ: {type(e_write).__name__}: {e_write}") + + csv_file.flush() + total_results_count += written_count_query + processed_query_count += 1 + end_time_query = time.time() + query_status_msg = "中断" if result_data.get('status') == 'Interrupted' else "完了" + print(f"===== クエリ {i}/{total_queries} {query_status_msg} - {written_count_query}件書き込み, 所要時間: {end_time_query - start_time_query:.2f} 秒 =====") + yield log_stream.getvalue(), None + + # 中断フラグが立っていたら、ループを終了 + if interrupted_flag: + print("\n中断リクエストに従い、次のクエリへ進まず処理を終了します。") + break + + # --- クエリ間の待機 (中断可能) --- + if i < total_queries: + sleep_duration = wait_config['base'] * 1.5 + (hash(query + str(i)) % (wait_config['base'] * 1.5)) + sleep_duration = max(wait_config['base'] * 0.8, min(sleep_duration, wait_config['base'] * 4.0)) + print(f"次のクエリまで {sleep_duration:.2f} 秒待機します...") + yield log_stream.getvalue(), None + interruptible_sleep(sleep_duration) + # 待機後にも中断チェック + if interrupt_event.is_set(): + print("待機中に中断リクエストを検出。処理を終了します。") + interrupted_flag = True + break # ループを抜ける + else: + print("\n全クエリの処理が完了しました。") + + except IOError as e_io: + print(f"★★★★★ CSVファイル '{output_csv_path}' のオープン/書き込み中にIOエラー ★★★★★") + print(f"エラータイプ: {type(e_io).__name__}: {e_io}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") + print("ファイルが他のプログラムで開かれていないか、書き込み権限があるか確認してください。") + output_csv_path = None # 結果ファイルパスを無効化 + except Exception as e_csv_loop: + print(f"★★★★★ CSV処理ループ中に予期せぬエラー ★★★★★") + print(f"エラータイプ: {type(e_csv_loop).__name__}: {e_csv_loop}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") + + except InterruptedError: # run_scraping全体で中断をキャッチ + print("\n★★★★★ 処理がユーザーによって中断されました ★★★★★") + interrupted_flag = True # 中断フラグを立てる + # ここで特別な処理は不要、finallyブロックで終了処理が行われる + except Exception as e_main: + print(f"\n★★★★★ メイン処理 (run_scraping) 中に予期せぬエラーが発生しました ★★★★★") + print(f"エラータイプ: {type(e_main).__name__}: {e_main}") + print("\n--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") + # エラー発生時も、可能な限りログと途中までのCSVを返す + + finally: + # --- 終了処理 --- + if driver: + print("\nWebDriver終了処理中...") + try: + driver.quit() + print("WebDriver正常終了。") + except Exception as e_quit: + print(f"★★★★★ WebDriver終了時にエラー ★★★★★") + print(f"エラータイプ: {type(e_quit).__name__}: {e_quit}") + + end_time_total = time.time() + total_duration_seconds = end_time_total - start_time_total + final_status = "中断" if interrupted_flag else "完了" + print(f"\n=== 全処理終了 ({final_status}) ===") + print(f"終了時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"処理{final_status}クエリ数: {processed_query_count}/{total_queries if total_queries > 0 else 'N/A'} 件") + print(f"CSV書き込み総行数: {total_results_count} 件") + print(f"総処理時間: {total_duration_seconds:.2f} 秒 ({total_duration_seconds/60:.2f} 分)") + if interrupted_flag: + print("*** 処理は途中で中断されました ***") + + final_log = log_stream.getvalue() + + # プログレスバーを完了状態にする + progress(1.0, desc=f"処理{final_status}") + + if output_csv_path and os.path.exists(output_csv_path) and os.path.getsize(output_csv_path) > 0: + print(f"結果CSVファイル: {output_csv_path}") + yield final_log, gr.File(value=output_csv_path, label=f"結果CSVダウンロード ({final_status})") + elif output_csv_path: + print(f"警告: 結果CSVファイル '{output_csv_path}' は空または存在しません。") + yield final_log, None + else: + print("結果CSVファイルは生成されませんでした。") + yield final_log, None -# --- CSVダウンロード用関数 (変更なし) --- -def download_local_csv(): - """ - ローカルの一時CSVファイルが存在すれば、そのパスを返す。 - """ - if os.path.exists(LOCAL_CSV_FILENAME): - return LOCAL_CSV_FILENAME - else: - print("ローカルCSVファイルが見つかりません。") - return None -# --- Gradio Interface (停止ボタン追加) --- +# --- Gradio UI 定義 (中断ボタン追加) --- with gr.Blocks(theme=gr.themes.Soft()) as demo: + gr.Markdown("# Google Maps スクレイピング (口コミ全件表示試行・中断機能付き)") gr.Markdown( """ - # Pitact 企業情報 スクレイピング ツール (Drive逐次同期 & 停止機能付き) - 指定されたPitactの一覧ページURLから企業情報を収集し、ローカルおよびGoogle Drive上のCSVに**1件ずつ**保存します。 - **Colab環境での実行を推奨します。** Drive同期機能はColabでのみ有効です。 - 処理中に「処理停止」ボタンを押すと、現在の処理がキリの良いところで中断されます。 + CSVクエリで検索し、詳細ページで「クチコミ」タブをクリック後、口コミエリアを**最後までスクロール**し、 + さらに**「もっと見る」ボタンを全てクリック**して全件表示を試みます。 + その後、基本情報と口コミ情報(`span.wiI7pd`優先)を抽出し、CSVに出力します。 + **「処理中断」ボタン**で進行中の処理を安全に停止できます(現在のクエリ完了後)。 + + **処理フロー:** + 1. クエリ検索 → リストスクロール → リンク抽出。 + 2. 詳細ページ遷移 → **「クチコミ」タブクリック** → 口コミコンテナ待機。 + 3. **口コミエリアを最後までスクロール**。 + 4. **「もっと見る」ボタンを全てクリック** (複数回試行)。 + 5. HTML取得 → **bs4**で解析 (基本情報: `.aIFcqe`優先, 口コミ本文: `span.wiI7pd`優先)。 + 6. 結果をCSVに出力(HTMLも保存)。 + 7. 各ステップおよび待機中に**中断リクエストをチェック**。 + + **注意:** ネットワーク状況やサイト構造の変更により時間がかかる、またはエラーが発生する場合があります。 """ ) + with gr.Row(): - with gr.Column(scale=3): - url_input = gr.Textbox( - label="Pitact 市区町村別一覧 URL", - placeholder="例: https://pitact.com/search/pref-hokkaido/city-1101", - value="https://pitact.com/search/pref-hokkaido/city-1101" - ) - drive_filename_input = gr.Textbox( - label="Google Drive 保存ファイル名 (Colab環境のみ有効)", - placeholder="例: Pitact_結果.csv (MyDrive直下に保存)", - value="Pitact_Scraping_Result.csv" - ) - with gr.Row(): - start_button = gr.Button("スクレイピング開始", variant="primary", scale=2) - stop_button = gr.Button("処理停止", variant="stop", scale=1) # 停止ボタンを追加 with gr.Column(scale=2): - log_output_display = gr.Textbox( - label="処理ログ", - lines=20, - interactive=False, - autoscroll=True - ) - # 停止ボタンのフィードバック用 (非表示でも良い) - stop_feedback = gr.Textbox(label="停止ステータス", interactive=False, visible=True) - + gr.Markdown("### ① 入力ファイルと出力設定") + input_csv_file = gr.File(label="検索クエリCSVファイル (1列目のみ使用)", file_types=[".csv"]) + output_dir_name = gr.Textbox(label="HTML保存先ディレクトリ名", value="html_reviews_expanded") + output_csv_name = gr.Textbox(label="出力CSVファイル名", value="結果_reviews_expanded.csv") + csv_encoding = gr.Dropdown(label="出力CSVエンコーディング", choices=['utf-8-sig', 'cp932'], value='utf-8-sig') + headless_mode = gr.Checkbox(label="ヘッドレスモードで実行 (エラー発生時はOFF推奨)", value=True) + with gr.Column(scale=1): + gr.Markdown("### ② 待機時間設定 (秒)") + wait_time_base = gr.Number(label="基本待機", minimum=1, maximum=20, step=0.5, value=4) + wait_time_detail = gr.Number(label="詳細/口コミ最大待機", minimum=10, maximum=60, step=1, value=25) + wait_time_search = gr.Number(label="検索リスト最大待機", minimum=5, maximum=60, step=1, value=15) - gr.Markdown("---") - gr.Markdown("## 収集結果プレビュー") - dataframe_output = gr.DataFrame( - label="収集データ(会社名, 住所, HP, 設立日) - 最大100件程度表示", - wrap=True - ) - gr.Markdown("---") - gr.Markdown("## ダウンロード") with gr.Row(): - download_button_local = gr.Button("ローカル保存CSVをダウンロード") - download_file_output_local = gr.File(label="ローカルCSVファイル") - - # 開始ボタンのイベント - start_event = start_button.click( - fn=scrape_pitact_gradio, - inputs=[url_input, drive_filename_input], - # 戻り値: DataFrame, local_file_path, None, log_string - outputs=[dataframe_output, download_file_output_local, gr.Textbox(visible=False), log_output_display] - ) - - # 停止ボタンのイベント - stop_button.click( - fn=request_stop, - inputs=[], - outputs=[stop_feedback], # 停止ステータス表示用のTextboxに出力 - cancels=[start_event] # 開始イベントをキャンセルしようと試みる (効果は限定的かも) + start_button = gr.Button("処理開始", variant="primary", size="lg", scale=3) + # --- 中断ボタンを追加 --- + stop_button = gr.Button("処理中断", variant="stop", size="lg", scale=1) + + gr.Markdown("### ③ 処理ステータスとエラーログ") + # プログレスバーを追加 + progress_bar = gr.Progress(track_tqdm=True) + status_textbox = gr.Textbox(label="ログ", lines=25, interactive=False, autoscroll=True, max_lines=2000) + + gr.Markdown("### ④ 結果ダウンロード") + output_csv_download = gr.File(label="結果CSVダウンロード", interactive=False) + + # ボタンクリック時の動作設定 + # 処理開始ボタン + start_button.click( + fn=run_scraping, + inputs=[input_csv_file, output_dir_name, output_csv_name, csv_encoding, + wait_time_base, wait_time_detail, wait_time_search, headless_mode], + outputs=[status_textbox, output_csv_download], + # progress 引数を渡す + show_progress='full' # Gradio 組み込みのプログレス表示を使う場合 ) + # 中断ボタン + # stop_button.click(fn=request_interrupt, inputs=None, outputs=None, cancels=[start_event]) # Gradioのcancel機能を使う場合 + # cancels 引数を使うには、start_button.click の返り値を変数に受ける必要があるが、 + # 複数出力がある場合はタプルになるなど複雑化する。 + # ここでは、Python側でフラグを立ててチェックする方式を採用。 + # request_interrupt の戻り値を status_textbox に一時的に表示する例 + stop_button.click(fn=request_interrupt, inputs=None, outputs=status_textbox) - # ダウンロードボタンのイベント - download_button_local.click( - fn=download_local_csv, - inputs=[], - outputs=[download_file_output_local] - ) - gr.Markdown( - """ - **注意:** - - 「処理停止」ボタンを押しても、現在実行中のネットワークアクセスやファイル書き込みが完了するまで、停止に時間がかかる場合があります。 - - Google Driveへの保存は、処理中に1件ずつ行われます。完了時にDrive上のファイルが最新の状態になります。 - - **Google Drive上のファイルは、この画面から直接ダウンロードできません。** Google Driveのウェブサイトやアプリからアクセスしてください。 - - 長時間実行すると、タイムアウトやリソース制限により中断される可能性があります。 - - スクレイピングは対象サイトの利用規約を確認し、負荷をかけすぎないよう注意して実行してください。 - """ - ) -# Gradioアプリの起動 -if __name__ == "__main__": - demo.launch(debug=True, share=True) \ No newline at end of file +# --- UI起動 --- +print("Gradio UIを起動します...") +# queue()で複数ユーザー対応、share=Trueで共有リンク生成 +demo.queue().launch(share=True, debug=False) \ No newline at end of file