diff --git "a/maps2.py" "b/maps2.py" --- "a/maps2.py" +++ "b/maps2.py" @@ -1,1194 +1,682 @@ +# --- Google Colab で実行する場合、最初に以下のセルを実行してください --- +# !pip install selenium google_colab_selenium webdriver-manager beautifulsoup4 pandas gradio + import gradio as gr -import os -import csv -import json +import pandas as pd +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.common.exceptions import WebDriverException, TimeoutException +try: + import google_colab_selenium as gs # Colab用 + IN_COLAB = True +except ImportError: + IN_COLAB = False import time from bs4 import BeautifulSoup -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.common.by import By # Byクラスをインポート -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC -from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, ElementClickInterceptedException -import re -from urllib.parse import urlparse, urljoin -import traceback # tracebackをインポート -import io -import contextlib -from datetime import datetime - -# --- WebDriverの選択 --- -IN_COLAB = 'google.colab' in str(get_ipython()) if 'get_ipython' in globals() else False -if IN_COLAB: - print("Google Colab環境を検出。google_colab_selenium を使用します。") - try: import google_colab_selenium as gs - except ImportError: print("google_colab_seleniumが見つかりません。!pip install google-colab-selenium を実行してください。"); gs = None -else: - print("ローカル環境を検出。通常の selenium webdriver を使用します。") - from selenium import webdriver - gs = None +from urllib.parse import urljoin # 相対URLを絶対URLに変換するために使用 +import traceback # エラー詳細表示用 +import os # ファイルパス操作用 +import shutil # ファイルコピー用 (Drive同期) + +# --- Google Drive マウント (Colab環境のみ) --- +DRIVE_MOUNT_POINT = '/content/drive/MyDrive/' +drive = None # Drive オブジェクトを保持 (Colab用) + +# --- 停止フラグ --- +stop_requested = False + +def setup_drive_mount(log_container): + """Colab環境の場合、Google Driveをマウントする""" + global drive + if IN_COLAB: + from google.colab import drive as colab_drive + try: + if not os.path.exists('/content/drive'): + log_container.append("Google Driveをマウントします。認証が必要です...") + print("Google Driveをマウントします。認証が必要です...") + drive = colab_drive + drive.mount('/content/drive') + log_container.append("Google Driveのマウント完了。") + print("Google Driveのマウント完了。") + else: + log_container.append("Google Driveは既にマウントされています。") + print("Google Driveは既にマウントされています。") + # 既存のマウントでも drive オブジェクトをセット + drive = colab_drive + return True, "" + except Exception as e: + error_msg = f"Google Driveのマウント中にエラーが発生しました: {e}\n{traceback.format_exc()}" + log_container.append(error_msg) + print(error_msg) + drive = None # エラー時はNoneに戻す + return False, error_msg + else: + log_container.append("Colab環境ではないため、Driveマウントはスキップします。") + print("Colab環境ではないため、Driveマウントはスキップします。") + return False, "Not in Colab environment." + + +# --- 定数 --- +LOCAL_CSV_FILENAME = "pitact_企業リスト_ローカル一時保存.csv" +CSV_HEADERS = ["会社名", "住所", "ホームページURL", "設立日", "Pitact 詳細URL"] + +# --- Selenium WebDriver 設定 --- +def setup_driver(log_container): + """Selenium WebDriverのインスタンスを設定して返す""" + log_container.append("WebDriverを設定中...") + print("WebDriverを設定中...") + chrome_options = Options() + chrome_options.add_argument('--headless') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--log-level=3') + chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) try: - from selenium.webdriver.chrome.service import Service as ChromeService - from webdriver_manager.chrome import ChromeDriverManager - except ImportError: - print("webdriver-manager が見つかりません。 `pip install webdriver-manager` を実行してください。") - ChromeService = None - ChromeDriverManager = None - - -# --- Helper Functions --- -def find_prefixed_data_string(data_structure): - """データ構造内から ")]}'\n" で始まる文字列を見つける(再帰的検索)""" - if isinstance(data_structure, str) and data_structure.startswith(")]}'\n"): - return data_structure - elif isinstance(data_structure, list): - for item in data_structure: - found = find_prefixed_data_string(item) - if found: - return found - elif isinstance(data_structure, dict): - for value in data_structure.values(): - found = find_prefixed_data_string(value) - if found: - return found - return None - -def find_details_data_by_id_or_heuristic(data_list, place_id=None): - """ - JSONデータリストから詳細情報を含む可能性のあるリストを特定する。 - place_idがあればそれを優先し、なければヒューリスティック(住所形式など)で探す。 - """ - if not isinstance(data_list, list): + if IN_COLAB and 'google_colab_selenium' in globals(): + log_container.append("Colab環境としてWebDriverを起動します。") + print("Colab環境としてWebDriverを起動します。") + driver = gs.Chrome(options=chrome_options) + else: + log_container.append("ローカル環境でのWebDriver起動を試みます。") + print("ローカル環境でのWebDriver起動を試みます。") + try: + from webdriver_manager.chrome import ChromeDriverManager + from selenium.webdriver.chrome.service import Service + service = Service(ChromeDriverManager().install()) + driver = webdriver.Chrome(service=service, options=chrome_options) + log_container.append("webdriver-manager を使用してWebDriverを起動しました。") + print("webdriver-manager を使用してWebDriverを起動しました。") + except ImportError: + log_container.append("webdriver-manager が見つかりません。パスが通っているChromeDriverを使用します。") + print("webdriver-manager が見つかりません。パスが通っているChromeDriverを使用します。") + driver = webdriver.Chrome(options=chrome_options) + + driver.set_page_load_timeout(45) + log_container.append("WebDriverの設定完了。") + print("WebDriverの設定完了。") + return driver + except Exception as e: + error_msg = f"WebDriverの設定中にエラーが発生しました: {e}\n{traceback.format_exc()}" + log_container.append(error_msg) + print(error_msg) return None - potential_candidates = [] - for item in data_list: - # 詳細データは通常、要素数が比較的多いリスト形式 - if not isinstance(item, list) or len(item) < 30: - continue - - is_candidate = False - # place_id が指定されていれば、リスト内にそのIDが含まれるかチェック - if place_id and place_id in str(item): - is_candidate = True - # place_id がない場合は、住所らしき情報が含まれるかヒューリスティックにチェック - elif not place_id: - has_address_like = any( - isinstance(sub, str) and - ("〒" in sub or - any(k in sub for k in ["都", "道", "府", "県", "市", "区", "町", "村", "丁目", "番地", "号"]) or - re.search(r'\d+-\d+-\d+', sub)) - for sub in item - ) - if has_address_like: - is_candidate = True +def get_html_with_driver(driver, url, log_container, wait_time=5): + """既存のSeleniumドライバを使用して指定されたURLからHTMLを取得する。""" + if not driver: + error_msg = "WebDriverが利用できません。\n" + log_container.append(error_msg) + return None, error_msg + log_message = "" + try: + log_msg = f"アクセス中: {url}" + log_container.append(log_msg) + print(log_msg) + driver.get(url) + + log_msg = f"ページ読み込み完了。{wait_time}秒待機..." + log_container.append(log_msg) + print(log_msg) + # 停止リクエストがあれば待機を短縮またはスキップすることも可能だが、 + # ここではシンプルに待機は完了させる + time.sleep(wait_time) + + html = driver.page_source + log_msg = f"{url} からHTMLを正常に取得しました。" + log_container.append(log_msg) + print(log_msg) + return html, log_message + except TimeoutException: + error_msg = f"ページの読み込みがタイムアウトしました: {url}" + log_container.append(error_msg) + print(error_msg) + return None, error_msg + except WebDriverException as e: + # 停止リクエストによる中断の場合、WebDriverExceptionが発生することがある + if stop_requested: + log_container.append("処理停止リクエストによりWebDriver操作が中断された可能性があります。") + return None, "WebDriver操作中断" + error_msg = f"WebDriverエラーが発生しました ({url}): {e}" + log_container.append(error_msg) + print(error_msg) + return None, error_msg + except Exception as e: + error_msg = f"予期せぬエラーが発生しました ({url}): {e}\n{traceback.format_exc()}" + log_container.append(error_msg) + print(error_msg) + return None, error_msg + +# --- BeautifulSoup HTML解析 (変更なし) --- +def parse_listing_page(html_content, log_container): + """企業一覧ページのHTMLを解析し、会社名と詳細ページURLのリストを取得する。""" + if not html_content: + error_msg = "一覧ページのHTMLコンテンツが空です。" + log_container.append(error_msg) + return [], error_msg + log_message = "" + soup = BeautifulSoup(html_content, 'html.parser') + company_list = [] + base_url = "https://pitact.com" # 相対URL解決用 + + company_boxes = soup.find_all('div', class_='box cat_01') + if not company_boxes: + warn_msg = "一覧ページで企業情報が見つかりませんでした(要素 '.box.cat_01')。" + log_container.append(warn_msg) + return [], warn_msg + + log_msg = f"{len(company_boxes)} 件の会社情報を検出。" + # log_container.append(log_msg) # ログが多すぎるのでコメントアウト + # print(log_msg) + + for box in company_boxes: + # --- 停止チェック --- + if stop_requested: + log_container.append("停止リクエスト検知 (一覧ページ解析中)。") + break # このページの残りの解析を中断 + + company_data = {"会社名": "取得失敗", "住所": "取得失敗", "Pitact 詳細URL": "取得失敗"} + name_tag_h2 = box.find('h2', class_='list_tit') + if name_tag_h2 and name_tag_h2.a: + company_data["会社名"] = name_tag_h2.a.get_text(strip=True) + detail_url_relative = name_tag_h2.a.get('href') + if detail_url_relative: + company_data["Pitact 詳細URL"] = urljoin(base_url, detail_url_relative) + else: + log_container.append(f"警告: 会社名/詳細URLタグ ('h2.list_tit > a') が見つからない要素がありました。") + + options_div = box.find('div', class_='list_options') + address_found_in_list = False + if options_div: + details = options_div.find_all('dl') + for detail in details: + dt, dd = detail.find('dt'), detail.find('dd') + if dt and dd and "住所" in dt.get_text(strip=True): + address_text = dd.get_text(strip=True) + if address_text.startswith("〒") and len(address_text.split(maxsplit=1)) > 1: + address_text = address_text.split(maxsplit=1)[1].strip() + company_data["住所"] = address_text + address_found_in_list = True + break - if is_candidate: - potential_candidates.append(item) + if company_data["Pitact 詳細URL"] != "取得失敗": + company_list.append(company_data) + else: + log_container.append(f"警告: '{company_data.get('会社名', '不明')}' の詳細URLが取得できませんでした。この会社はスキップされます。") + + return company_list, log_message + +def parse_detail_page(html_content, log_container): + """企業詳細ページのHTMLを解析し、必要な情報(ホームページURL、設立日、住所)を抽出する。""" + if not html_content: + error_msg = "詳細ページのHTMLコンテンツが空です。" + log_container.append(error_msg) + return {}, error_msg + log_message = "" + soup = BeautifulSoup(html_content, 'html.parser') + details = {"ホームページURL": "取得失敗", "設立日": "取得失敗", "住所": "取得失敗"} + + info_table = soup.find('table', class_='table is-bordered is-fullwidth responsive-stack') + if not info_table: + warn_msg = "詳細ページで企業基本情報テーブルが見つかりませんでした(要素 'table.table.is-bordered...')。" + log_container.append(warn_msg) + return details, warn_msg + + rows = info_table.find_all('tr') + found_count = 0 + for row in rows: + # --- 停止チェック --- + # 詳細ページの解析は比較的速いので、ここではチェックしない方がスムーズかも + # if stop_requested: + # log_container.append("停止リクエスト検知 (詳細ページ解析中)。") + # break + + th_tag = row.find('th') + td_tag = row.find('td') + + if th_tag and td_tag: + header = th_tag.get_text(strip=True) + value_td = td_tag + + if "ホームページURL" in header: + link_tag = value_td.find('a') + if link_tag and link_tag.has_attr('href') and link_tag['href'].startswith('http'): + details["ホームページURL"] = link_tag['href'] + else: + url_text = value_td.get_text(strip=True) + if url_text and url_text != '--': + details["ホームページURL"] = url_text + else: + details["ホームページURL"] = "取得失敗" + found_count += 1 + + elif "設立日" in header: + value_text = value_td.get_text(strip=True).replace('--','').strip() + details["設立日"] = value_text if value_text else "取得失敗" + found_count += 1 + + elif "住所" in header: + address_text = "" + for content in value_td.contents: + if isinstance(content, str): + address_text += content.strip() + elif content.name == 'br': + address_text += " " + elif content.name == 'a' and 'map' in content.get('href', ''): + pass + address_text = ' '.join(address_text.split()) + if address_text.startswith("〒") and len(address_text.split(maxsplit=1)) > 1: + address_text = address_text.split(maxsplit=1)[1].strip() + details["住所"] = address_text if address_text else "取得失敗" + found_count += 1 + + return details, log_message + +# --- 逐次保存用 CSV書き込み関数 (変更なし) --- +def append_to_csv(data_dict, filename, headers, log_container): + """ + 辞書データをCSVファイルに追記する。 + ファイルが存在しない場合や空の場合はヘッダーを書き込む。 + """ + file_exists = os.path.isfile(filename) + is_empty = not file_exists or os.path.getsize(filename) == 0 - if not potential_candidates: - return None + df_to_append = pd.DataFrame([data_dict]) + for col in headers: + if col not in df_to_append.columns: + df_to_append[col] = "取得失敗" + df_to_append = df_to_append[headers] - # 候補が1つならそれを返す - if len(potential_candidates) == 1: - return potential_candidates[0] + try: + df_to_append.to_csv( + filename, + mode='a', + header=is_empty, + index=False, + encoding='utf-8-sig' + ) + return True, "" + except Exception as e: + error_msg = f"警告: '{data_dict.get('会社名', '不明')}' の情報をCSV '{os.path.basename(filename)}' に追記中にエラー: {e}" + log_container.append(error_msg) + print(error_msg) + return False, error_msg - # 候補が複数ある場合、スコアリングで最もそれらしいものを選ぶ - best_candidate = None - max_score = -1 +# --- Google Drive 同期ヘルパー関数 (変更なし) --- +def sync_file_to_drive(local_file_path, gdrive_target_file_path, log_container): + """ + 指定されたローカルファイルをGoogle Driveのターゲットファイルパスにコピーする。 + Colab環境で、Driveがマウントされ、ローカルファイルが存在する場合のみ実行。 + """ + if not IN_COLAB or not drive or not os.path.exists(local_file_path): + return False, "Drive同期の前提条件を満たしていません(非Colab、未マウント、ローカルファイル欠損など)。" - for candidate in potential_candidates: - score = len(candidate) # 要素数が多いほど詳細情報の可能性が高い - try: - # 特定のインデックスにリストが存在するか(構造的な特徴) - if any(isinstance(candidate[idx], list) and candidate[idx] for idx in [7, 13, 178] if idx < len(candidate)): - score += 50 - # URLらしき文字列が含まれるか - if 7 < len(candidate) and isinstance(candidate[7], list) and len(candidate[7]) > 0 and isinstance(candidate[7][0], str) and candidate[7][0].startswith('http'): - score += 50 - # 別の構造的な特徴 - if 34 < len(candidate) and isinstance(candidate[34], list) and candidate[34]: - score += 30 - except Exception: - # スコアリング中のエラーは無視 - pass - - if score > max_score: - max_score = score - best_candidate = candidate - - return best_candidate - - -def is_domain_like(text): - """文字列がドメイン名らしい形式か簡易的に判定""" - if not isinstance(text, str): return False - text = text.strip().lower() - common_tlds = ['.com', '.jp', '.co.jp', '.net', '.org', '.info', '.biz'] - # URLスキーマ、パス、特殊文字、全角文字、IPアドレス形式、前後のドット、連続ドットは除外 - if re.search(r'^(https?|ftp)://|[/\\?#\s\u3000-\uFFFF:;@!$%^*()=+]', text): return False - if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', text): return False - if text.startswith('.') or text.endswith('.') or '..' in text: return False - # ドットを含み、一般的なTLDで終わるかチェック - return '.' in text and any(text.endswith(tld) for tld in common_tlds) - -def safe_get(data, index, default=None): - """ネストされたリストや辞書から安全に値を取得する""" - if isinstance(index, int): - try: - return data[index] if isinstance(data, list) and index < len(data) else default - except IndexError: - return default - elif isinstance(index, list): # インデックスのリストでネストされた要素を取得 - current = data - for idx in index: - try: - if isinstance(current, list) and isinstance(idx, int) and idx < len(current): - current = current[idx] - elif isinstance(current, dict) and idx in current: - current = current[idx] - else: - return default # 途中でリスト/辞書でない、またはインデックス/キーが存在しない場合 - except (IndexError, KeyError, TypeError): - return default # その他の予期せぬエラー - return current - elif isinstance(index, str): # 文字列インデックスは辞書のキーとして扱う - return data.get(index, default) if isinstance(data, dict) else default - return default - - -# --- HTML抽出関数 (本文抽出を span.wiI7pd 優先に変更) --- -def extract_details_and_reviews_from_html(html_content): - """詳細HTMLから基本情報と口コミ情報を抽出 (本文は span.wiI7pd 優先)""" - print(" [HTML Extractor - Details & Reviews (wiI7pd priority)] 開始") - soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') - details = {"name": "N/A", "url": "", "phone": "N/A", "address": "N/A", "links": {}, "reviews": [], "extraction_error": None} try: - # --- 基本情報の抽出 --- - main_container_selector = '.aIFcqe' - main_container = soup.select_one(main_container_selector) - search_root = soup # デフォルトはページ全体 - if main_container: - print(f" '{main_container_selector}' コンテナ発見。基本情報を抽出。") - search_root = main_container - else: - print(f" 警告: '{main_container_selector}' コンテナが見つかりません。ページ全体から基本情報を抽出。") - - # 名前 (h1タグを探す) - name_tag = search_root.find('h1') - if name_tag: - details['name'] = name_tag.get_text(strip=True) - elif details['name'] == 'N/A': # フォールバックでから取得 - title_tag = soup.find('title') - if title_tag and title_tag.string: - title_text = title_tag.string.replace('- Google マップ', '').strip() - if title_text.lower() != "google マップ": details["name"] = title_text - - # 電話、住所、ウェブサイトなどの情報を抽出 - selectors_map = { - "phone": ['button[data-item-id^="phone:tel:"]', 'div.Io6YTe', 'button[aria-label*="電話番号"]'], - "address": ['button[data-item-id="address"]', 'div.rogA2c', 'button[aria-label*="住所"]'], - "website": ['a[data-item-id="authority"][href^="http"]', 'button[data-item-id="authority"]', 'a[aria-label*="ウェブサイト"][href^="http"]'], - "other_link": ['a.CsEnBe[href^="http"]'] # 公式サイト以外のリンク - } - - for info_type, selectors in selectors_map.items(): - found_val = None - for selector in selectors: - element = search_root.select_one(selector) - # コンテナ内で見つからなければページ全体で再検索 - if not element and search_root != soup: - element = soup.select_one(selector) - - if element: - data_item_id = element.get('data-item-id', '') - aria_label = element.get('aria-label', '') - element_text = element.get_text(strip=True) - href = element.get('href') - - if info_type == "phone": - phone_num = None - if data_item_id.startswith('phone:tel:'): phone_num = data_item_id.split(':')[-1] - elif "電話番号:" in aria_label: phone_num = re.search(r'([\d-]+)', aria_label.split("電話番号:")[-1]) - elif element.name == 'div' and re.match(r'^[\d\s-]+$', element_text): phone_num = element_text - # 電話番号形式の整形と検証 - if isinstance(phone_num, str): phone_num = phone_num.strip() - elif hasattr(phone_num, 'group'): phone_num = phone_num.group(1).strip() - if phone_num and re.match(r'^[\d-]+$', phone_num.replace('ー','-')): - found_val = phone_num.replace('ー','-') - break # 電話番号が見つかったらループ脱出 - elif info_type == "address": - addr_text = None - if data_item_id == 'address': addr_text = element_text - elif "住所:" in aria_label: addr_text = aria_label.split("住所:")[-1].split('(新しいウィンドウで開きます)')[0].strip() - elif element.name == 'div' and ("〒" in element_text or any(k in element_text for k in ["都","道","府","県","市","区","町","村"])): addr_text = element_text - # 住所らしき文字列か簡易チェック - if addr_text and len(addr_text) > 5: # ある程度の長さがあるか - found_val = addr_text - break # 住所が見つかったらループ脱出 - elif info_type == "website" or info_type == "other_link": - if href and href.startswith('http') and 'google.com' not in urlparse(href).netloc: # Google自身のリンクは除外 - link_name = "N/A"; is_website = False - # リンクの種類を判別 - if data_item_id == 'authority' or "ウェブサイト" in aria_label: - link_name = element_text if is_domain_like(element_text) else "ウェブサイト" - is_website = True - elif info_type == "other_link": - link_name = f"リンク ({element_text})" if element_text else "外部リンク" - elif is_domain_like(element_text): # ドメイン名らしきテキストの場合 - link_name = element_text - - if link_name != "N/A": - normalized_url = href.rstrip('/') - # 重複を避けて links 辞書に追加 - if not any(existing_url.rstrip('/') == normalized_url for existing_url in details["links"].values()): - details["links"][link_name] = href - # website タイプで見つかったものを優先的にメインURL候補へ (まだ未設定の場合) - if is_website and details["url"] == "": - details["url"] = href - # website タイプならこのセレクタでの探索は終了 - if info_type == "website": - found_val = href # 見つかったことを示す - break # websiteセレクタのループ脱出 - - # 各タイプの最初の有効な値を details に格納 (other_link は除く) - if found_val and info_type in details and info_type != "other_link": - details[info_type] = found_val - - # メインURLがまだ決まっていない場合、links 辞書から探す - if details["url"] == "": - priority = ["ウェブサイト", "authority"] # 公式サイトらしき名前を優先 - found_url_in_links = False - for p_word in priority: - for name, url in details["links"].items(): - if p_word in name.lower(): - details["url"] = url - found_url_in_links = True - break - if found_url_in_links: - break - # それでも見つからなければ、ドメイン名らしきリンク > 最初のリンク - if not found_url_in_links: - domain_link = next((url for name, url in details["links"].items() if is_domain_like(name)), None) - if domain_link: - details["url"] = domain_link - elif details["links"]: # linksに何かあれば最初のものをURLとする - details["url"] = next(iter(details["links"].values())) - print(f" 基本情報抽出完了: Name='{details['name']}'") - - - # --- 口コミ情報の抽出 --- - print(" 口コミ情報抽出開始 (span.wiI7pd 優先)...") - review_container_selector = 'div.GHT2ce.NsCY4' - review_container = soup.select_one(review_container_selector) - if review_container: - print(f" '{review_container_selector}' 口コミコンテナ発見。") - # 口コミカードの特定 (jftiEf or MyEned) - review_card_selectors = ['div.jftiEf', 'div.MyEned'] - review_cards = [] - for sel in review_card_selectors: - review_cards = review_container.select(sel) - if review_cards: - print(f" 口コミカードセレクタ '{sel}' で {len(review_cards)} 件発見。") - break - if not review_cards: - print(" 警告: 口コミコンテナ内で口コミカードが見つかりません。") - - extracted_reviews = [] - for card_idx, card in enumerate(review_cards): - try: - review_text = "N/A"; reviewer_name = "N/A"; rating = "N/A" - - # 口コミ本文抽出 (span.wiI7pd 優先) - text_span_wiI7pd = card.select_one('span.wiI7pd') - if text_span_wiI7pd: - review_text = text_span_wiI7pd.get_text(strip=True) - else: - # フォールバック: span[jscontroller="MZnM8e"] - full_text_span = card.select_one('span[jscontroller="MZnM8e"]') - if full_text_span: - review_text = full_text_span.get_text(strip=True) - - # 投稿者名 (.d4r55) - name_el = card.select_one('.d4r55'); - if name_el: reviewer_name = name_el.get_text(strip=True) - - # 評価 (.kvMYJc aria-label) - rating_el = card.select_one('.kvMYJc'); - if rating_el: - aria_label = rating_el.get('aria-label', ''); - match = re.search(r'星 (\d+(\.\d+)?)', aria_label) # "星 5.0" などを想定 - if match: rating = match.group(1) - - # 情報が一部でも取れていれば追加 - if review_text != "N/A" or reviewer_name != "N/A": - extracted_reviews.append({"reviewer": reviewer_name, "rating": rating, "text": review_text if review_text != "N/A" else ""}) - - except Exception as e_card: - print(f" 口コミカード {card_idx+1} の解析中にエラー: {e_card}") - extracted_reviews.append({"reviewer": "Error", "rating": "N/A", "text": f"解析エラー: {e_card}"}) - - details['reviews'] = extracted_reviews - print(f" 口コミ抽出完了: {len(details['reviews'])} 件") - else: - print(f" 警告: '{review_container_selector}' 口コミコンテナが見つかりません。") - - except Exception as e_extract: - print(f"★★★★★ HTML抽出処理中にエラーが発生しました ★★★★★") - error_trace = traceback.format_exc() - print(error_trace) - details['extraction_error'] = f"Type: {type(e_extract).__name__}, Msg: {e_extract}\nTrace: {error_trace}" - - print(f" [HTML Extractor - Details & Reviews (wiI7pd priority)] 完了: Name='{details['name']}'") - return details - - -# --- CSV Loading Function --- -def load_queries(csv_path): - """CSVファイルを読み込み、1列目のクエリをリストとして返す""" - queries = [] - encodings_to_try = ['utf-8-sig', 'utf-8', 'cp932', 'shift_jis'] # 試すエンコーディングリスト - file_encoding = None - print(f"CSVファイル読み込み開始: {os.path.basename(csv_path)}") - if not csv_path or not os.path.exists(csv_path): - print("エラー: CSVファイルが見つかりません。") - return [] - - # ファイルのエンコーディングを特定 - for encoding in encodings_to_try: - try: - with open(csv_path, 'r', encoding=encoding, errors='strict') as f: - f.read(1024) # ファイルの一部を読んでエンコーディングを確認 - file_encoding = encoding - print(f" エンコーディング '{encoding}' で読み込み試行...") - break - except (UnicodeDecodeError, LookupError): - continue # 次のエンコーディングを試す - except Exception as e_enc: - print(f" '{encoding}' 試行中に予期せぬエラー: {e_enc}") - continue - - if not file_encoding: - print(f"エラー: ファイル '{os.path.basename(csv_path)}' を読み込め���エンコーディングが見つかりません。") - return [] - - line_num = 0 + gdrive_target_dir_path = os.path.dirname(gdrive_target_file_path) + if not os.path.exists(gdrive_target_dir_path): + log_msg = f"Drive側ディレクトリ作成試行: {gdrive_target_dir_path}" + # log_container.append(log_msg) + # print(log_msg) + os.makedirs(gdrive_target_dir_path, exist_ok=True) + # log_container.append(f"Drive側ディレクトリ '{os.path.basename(gdrive_target_dir_path)}' 作成完了 (または既存)。") + + shutil.copy2(local_file_path, gdrive_target_file_path) + sync_msg = f"-> Drive同期成功: {os.path.basename(local_file_path)} -> {gdrive_target_file_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}" + print(sync_msg) # 同期成功はコンソールのみ表示 + return True, "" + + except Exception as e_sync: + error_msg = f"★★★★★ Google Driveへのファイル同期中にエラー ★★★★★\n" + error_msg += f" ローカルファイル: {local_file_path}\n" + error_msg += f" Driveターゲットパス: {gdrive_target_file_path}\n" + error_msg += f" エラー: {type(e_sync).__name__}: {e_sync}\n" + error_msg += " 警告: このファイルの同期に失敗しました。" + log_container.append(error_msg) + print(error_msg) + return False, error_msg + +# --- 停止リクエスト処理関数 --- +def request_stop(): + """停止フラグを立て、ログにメッセージを表示する""" + global stop_requested + if not stop_requested: # 複数回押されてもログは1回だけ + print("停止リクエストを受け付けました。") + stop_requested = True + # ログ表示エリアにもメッセージを追加したいが、 + # この関数はフラグを立てるだけにして、scrape関数内でログを出す方が良い + # return "停止リクエストを受け付けました。現在のイテレーション完了後に停止を試みます..." + # else: + # return "既に停止リクエスト受付済みです。" + return "停止リクエスト受付済み。処理中断を試みます..." # Gradioボタンのフィードバック用 + +# --- スクレイピング実行関数 (停止チェック追加) --- +def scrape_pitact_gradio(base_url, drive_filename, progress=gr.Progress(track_tqdm=True)): + """ + Gradioインターフェースから呼び出されるスクレイピング処理関数。 + 停止リクエストがあれば中断する。 + """ + global stop_requested + stop_requested = False # 開始時に必ずリセット + + log_output_list = ["処理を開始します..."] + all_company_data_memory = [] + page_num = 1 + total_companies_processed = 0 + driver = None + local_output_path = os.path.join(".", LOCAL_CSV_FILENAME) + gdrive_output_path = None + final_drive_file_ref = None + + # --- 1. Drive マウント --- + drive_mounted, mount_msg = setup_drive_mount(log_output_list) + if IN_COLAB and not drive_mounted: + log_output_list.append("エラー: Google Driveのマウントに失敗したため、処理を中止します。") + return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) + if IN_COLAB and drive_filename: + safe_drive_filename = "".join(c if c.isalnum() or c in ('_', '-', '.') else '_' for c in drive_filename) + if not safe_drive_filename.lower().endswith('.csv'): + safe_drive_filename += '.csv' + gdrive_output_path = os.path.join(DRIVE_MOUNT_POINT, safe_drive_filename) + log_output_list.append(f"Google Drive 保存先: {gdrive_output_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}") + elif IN_COLAB and not drive_filename: + log_output_list.append("警告: Driveファイル名が指定されていません。Driveへの保存は行われません。") + gdrive_output_path = None + else: + gdrive_output_path = None + + # --- 2. 入力URL検証 --- + if not base_url or not base_url.startswith("https://pitact.com/search/"): + log_output_list.append("エラー: 無効なPitactの検索URLです。") + return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) + + log_output_list.append(f"対象URL: {base_url}") + log_output_list.append(f"ローカル保存先: '{os.path.basename(local_output_path)}'") + if gdrive_output_path: + log_output_list.append(f"Drive同期先: '{os.path.basename(gdrive_output_path)}'") + + # --- 3. ローカルCSVファイル初期化 --- try: - with open(csv_path, 'r', encoding=file_encoding, newline='') as f: - reader = csv.reader(f) - try: - header = next(reader) # 最初の行を読み込む - line_num += 1 - print(f" 1��目 (ヘッダー可能性あり): {header}") - except StopIteration: - print("情報: CSVファイルが空です。") - return [] # ファイルが空なら終了 - - # 1行目がヘッダーかどうかを判定 (簡易的) - header_keywords = ['query', 'search', 'keyword', 'クエリ', '検索', 'キーワード', '店舗', '会社'] - first_col_header = header[0].strip().lower() if header else "" - is_header = any(hkw in first_col_header for hkw in header_keywords) - - # 1行目がヘッダーでなく、かつ内容があればクエリとして追加 - if not is_header and header and header[0].strip(): - queries.append(header[0].strip()) - elif is_header: - print(" 1行目はヘッダーと判断しスキップします。") - - # 2行目以降を処理 - for row in reader: - line_num += 1 - # 1列目にデータがあればクエリとして追加 - if row and row[0].strip(): - queries.append(row[0].strip()) - # 1列目が空でも他の列にデータがあれば警告を表示 (スキップ対象) - elif any(cell.strip() for cell in row): - print(f"警告: 行 {line_num} の1列目が空です: {row}。スキップします。") - - print(f" CSVから {len(queries)} 件の有効なクエリを抽出しました。") + if os.path.exists(local_output_path): + os.remove(local_output_path) + log_output_list.append(f"既存のローカルファイル '{os.path.basename(local_output_path)}' を削除しました。") + pd.DataFrame(columns=CSV_HEADERS).to_csv(local_output_path, index=False, encoding='utf-8-sig') + log_output_list.append(f"ローカル出力ファイル '{os.path.basename(local_output_path)}' を初期化しました。") except Exception as e: - # CSV処理中のエラーハンドリング - print(f"★★★★★ CSVファイル処理中にエラー (行 {line_num}) ★★★★★") - print(f"エラータイプ: {type(e).__name__}") - print(f"エラーメッセージ: {e}") - print("--- スタックトレース ---") - print(traceback.format_exc()) - print("----------------------") - return [] # エラー発生時は空リストを返す - return queries - - -# --- Single Query Processing Function (「もっと見る」クリック追加, SyntaxError修正) --- -def process_single_query_full_list(driver, query, query_index, output_dir, wait_config): - """単一クエリ処理: 検索→リストスクロール→リンク抽出→詳細ページ→口コミタブ→口コミスクロール→「もっと見る」クリック→HTML取得→解析""" - print(f"\n--- クエリ処理開始 [Index:{query_index}] ---: {query}") - results_list = [] - safe_query_part = re.sub(r'[\\/*?:"<>|]', '_', query)[:30].strip() or "empty_query" - base_url = "https://www.google.com/maps/" - - # 待機時間設定 - WAIT_TIME_BASE = wait_config['base'] - WAIT_TIME_DETAIL = wait_config['detail'] - WAIT_TIME_SEARCH = wait_config['search'] - # スクロール設定 - SCROLL_PAUSE_TIME = max(1.5, WAIT_TIME_BASE * 0.5) - MAX_SCROLL_ATTEMPTS = 30 - SCROLL_PAUSE_TIME_REVIEW = max(1.0, WAIT_TIME_BASE * 0.3) - MAX_SCROLL_ATTEMPTS_REVIEW = 500 # 口コミは多い場合があるので回数を増やす - REVIEW_SCROLL_STUCK_LIMIT = 5 # 口コミスクロール停止判定の閾値 + log_output_list.append(f"エラー: ローカル出力ファイルの初期化/削除に失敗: {e}") + return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) + # --- 4. WebDriver起動 --- try: - # 1. 検索実行とリスト表示待機 - search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" - print(f" URLにアクセス: {search_url}") - driver.get(search_url) - print(f" 検索結果リスト表示待機 (最大{WAIT_TIME_SEARCH}秒)...") - list_container_selector = 'div[role="feed"], div[aria-label*="の検索結果"]' - try: - list_container = WebDriverWait(driver, WAIT_TIME_SEARCH).until( - EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) - ) - # リスト内に最初の場所リンクが表示されるまで待つ - WebDriverWait(driver, 10).until( - EC.visibility_of_element_located((By.CSS_SELECTOR, f'{list_container_selector} a[href*="/maps/place/"]')) - ) - print(" 検索結果リスト表示を確認。") - except TimeoutException as e_timeout: - # タイムアウトエラー処理 - print(f" エラー: 検索結果リストの表示タイムアウト。URL: {search_url}\n{e_timeout}") - # ★★★ SyntaxError修正 ★★★ - print("--- HTML Snapshot (Timeout) ---") - try: - print(driver.page_source[:1000]) # デバッグ用にHTMLの先頭を表示 - except Exception as e_snapshot: - print(f" ページソース取得失敗: {e_snapshot}") - print("--- End Snapshot ---") - results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Timeout)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Timeout'}) - return results_list # このクエリはこれ以上処理できない - except Exception as e_wait: - # その他のリスト待機エラー処理 - print(f"★★★★★ リスト待機中に予期せぬエラー ★★★★★\nURL: {search_url}\n{type(e_wait).__name__}: {e_wait}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Wait Exception)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Wait Exception'}) - return results_list # このクエリはこれ以上処理できない - - # 2. 検索リストのスクロール - print(" 検索リストをスクロールして全結果を表示...") - last_height = driver.execute_script("return arguments[0].scrollHeight", list_container) - scroll_attempts = 0 - stuck_count = 0 - while scroll_attempts < MAX_SCROLL_ATTEMPTS: - try: - # スクロール実行 - driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', list_container) - time.sleep(SCROLL_PAUSE_TIME) - # 新しい高さを取得 - new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) - # 終了マーカーを探す - end_markers = driver.find_elements(By.XPATH, "//span[contains(text(), '結果は以上です')] | //p[contains(text(), '結果は以上です')]") - if any(el.is_displayed() for el in end_markers): - print(" 「結果は以上です」表示確認。検索リストスクロール終了。") - break - # 高さが変わらなくなった場合の処理 - if new_height == last_height: - stuck_count += 1 - print(f" 検索リストスクロール高さ変化なし ({stuck_count}回目)。再試行...") - time.sleep(SCROLL_PAUSE_TIME * 1.5) # 少し長めに待つ - new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) # 再確認 - if new_height == last_height and stuck_count >= 3: # 3回連続で変化なければ終了と判断 - print(" 高さ変化なしが続いたため、検索リストスクロール終了と判断。") - break + driver = setup_driver(log_output_list) + if not driver: + log_output_list.append("エラー: WebDriverの起動に失敗しました。処理を中止します。") + return pd.DataFrame(), local_output_path, None, "\n".join(log_output_list) + log_output_list.append("WebDriverの起動成功。") + + # --- 5. メインループ (ページ単位) --- + while True: + # --- 停止チェック (ページ処理開始前) --- + if stop_requested: + log_output_list.append("\n===== 停止リクエストにより処理を中断します =====") + break + + page_log = f"\n===== 一覧ページ {page_num} の処理開始 =====" + log_output_list.append(page_log) + print(page_log) + + current_list_url = base_url if page_num == 1 else f"{base_url}/page-{page_num}" + listing_html, fetch_log = get_html_with_driver(driver, current_list_url, log_output_list, wait_time=6) + + # --- 停止チェック (HTML取得後) --- + if stop_requested: + log_output_list.append("\n===== 停止リクエストにより処理を中断します =====") + break + if not listing_html: + log_output_list.append(f"一覧ページ {page_num} の取得に失敗したため、処理を停止します。") + break + + companies_on_page, parse_log = parse_listing_page(listing_html, log_output_list) + + # --- 停止チェック (一覧解析後) --- + if stop_requested: + log_output_list.append("\n===== 停止リク���ストにより処理を中断します =====") + break + if not companies_on_page: + if page_num == 1: + log_output_list.append(f"一覧ページ {page_num} で会社が見つかりませんでした。URL誤りかサイト構造変更の可能性があります。") else: - stuck_count = 0 # 高さが変わればリセット - last_height = new_height - except Exception as e_scroll: - # スクロール中のエラー - print(f"★★★★★ 検索リストスクロール中にエラー ★★★★★\n{type(e_scroll).__name__}: {e_scroll}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - print(" スクロールエラー発生。可能な範囲で続行します。") - # スクロールエラーは致命的ではない場合もあるので続行 - scroll_attempts += 1 - if scroll_attempts >= MAX_SCROLL_ATTEMPTS: - print(f" 検索リスト最大スクロール回数 ({MAX_SCROLL_ATTEMPTS}) 到達。") - - # 3. リンク抽出 - print(" 検索結果リストからリンクを抽出...") - unique_place_links = set() - result_card_selector = '.hfpxzc' # メインのセレクタ - try: - # スクロール後にコンテナ要素を再取得 - list_container_updated = WebDriverWait(driver, 10).until( - EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) - ) - # メインセレクタで要素を検索 - result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) - print(f" '{result_card_selector}' 要素を {len(result_cards)} 件発見。") - - # メインセレクタで見つからない場合の代替セレクタ試行 - if not result_cards: - print(f" 警告: '{result_card_selector}' が見つかりません。代替セレクタ 'a.hfpxzc' で試行...") - result_card_selector = 'a.hfpxzc' # aタグ自体がクラスを持つ場合 - result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) - print(f" 代替セレクタで {len(result_cards)} 件発見。") - if not result_cards: - print(f" 警告: 代替セレクタ 'a.Nv2PK' で試行...") # さらなる代替 - result_card_selector = 'a.Nv2PK' - result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) - print(f" 代替セレクタで {len(result_cards)} 件発見。") - - # 取得した要素からリンクURLを抽出 - link_extraction_errors = 0 - for card_idx, card in enumerate(result_cards): - try: - link_element = None - if card.tag_name == 'a': # 要素自体がaタグの場合 - link_element = card - else: # 要素内にaタグを探す - try: - link_element = card.find_element(By.TAG_NAME, 'a') - except NoSuchElementException: - continue # aタグがなければスキップ - - if link_element: - href = link_element.get_attribute('href') - # Google Mapsの場所詳細URLか確認 - if href and "/maps/place/" in href and not href.startswith("javascript:"): - absolute_href = urljoin(base_url, href) # 相対URLを絶対URLに変換 - unique_place_links.add(absolute_href) # setに追加して重複排除 - except StaleElementReferenceException: - # 要素が古くなった場合は無視して次へ - link_extraction_errors += 1 - continue - except Exception as e_extract_link: - # その他のリンク抽出エラー - print(f"★★★★★ カード {card_idx+1} からのリンク抽出エラー ★★★★★\n{type(e_extract_link).__name__}: {e_extract_link}") - link_extraction_errors += 1 - if link_extraction_errors > 0: - print(f" リンク抽出中に {link_extraction_errors} 件のエラーが発生しました。") - print(f" 抽出したユニークリンク数: {len(unique_place_links)}") - except Exception as e_find_links: - # リンク抽出プロセス全体のエラー - print(f"★★★★★ リンク抽出プロセス全体でエラー ★★★★★\n使用したセレクタ: '{result_card_selector}'\n{type(e_find_links).__name__}: {e_find_links}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': f'Error (Link Extraction Fail)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Link Extraction Fail'}) - return results_list # リンクがなければ続行不可 - - # 有効なリンクが見つからなかった場合 - if not unique_place_links: - print(" 有効な詳細ページリンクが見つかりませんでした。このクエリの結果はありません。") - results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': 'No Results Found', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Success: No Results'}) - return results_list # 結果がないのでこのクエリは終了 - - # 4. 各リンクの詳細ページを処理 - print(f" {len(unique_place_links)} 件の詳細情報を取得...") - link_list = sorted(list(unique_place_links)) - processed_urls = set() - - for i, place_url in enumerate(link_list, 1): - if place_url in processed_urls: continue # 既に処理済みのURLはスキップ - processed_urls.add(place_url) - - print(f"\n --- 詳細取得 [Query:{query_index}, Result:{i}/{len(link_list)}] ---") - # 結果格納用辞書初期化 - result_details = {'query_index': query_index, 'original_query': query, 'result_rank': i, 'place_url': place_url, 'html_filename': 'N/A', 'name': 'N/A', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Pending', 'extraction_error': None} - - try: - # 詳細ページへ遷移 - print(f" 詳細ページに遷移: {place_url}") - driver.get(place_url) - # ページタイトル(h1)が表示されるまで待機 - WebDriverWait(driver, WAIT_TIME_DETAIL).until( - EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1')) - ) - time.sleep(WAIT_TIME_BASE * 0.2) # 少し待機 - - # --- 口コミタブをクリック --- - review_tab_text = "クチコミ" # カタカナ指定 - review_tab_xpath = f"//button[@role='tab'][contains(., '{review_tab_text}') or contains(@aria-label, '{review_tab_text}')]" - review_tab_clicked = False - review_scroll_element = None # 口コミスクロール対象要素 - try: - print(f" {review_tab_text}タブ クリック試行 (XPath: {review_tab_xpath})...") - # クリック可能になるまで待機 - review_tab = WebDriverWait(driver, 10).until( - EC.element_to_be_clickable((By.XPATH, review_tab_xpath)) - ) - # スクロールして表示させてからJavaScriptでクリック - driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", review_tab) - time.sleep(0.3) # スクロール安定待ち - driver.execute_script("arguments[0].click();", review_tab) - review_tab_clicked = True - print(f" {review_tab_text}タブをクリックしました。口コミコンテナ表示待機...") - - # --- 口コミコンテナ表示待機 & 要素取得 --- - review_container_selector = 'div.GHT2ce.NsCY4' # 口コミコンテナのセレクタ - # コンテナ内の最初のレビューカード(どちらかのクラス) - first_review_card_selector = f'{review_container_selector} div.jftiEf:first-of-type, {review_container_selector} div.MyEned:first-of-type' - - # コンテナ要素が表示されるまで待機し、スクロール対象として取得 - review_scroll_element = WebDriverWait(driver, WAIT_TIME_DETAIL).until( - EC.visibility_of_element_located((By.CSS_SELECTOR, review_container_selector)) - ) - # 念のため、最初のカードも表示されるか短時間待つ - WebDriverWait(driver, 5).until( - EC.visibility_of_element_located((By.CSS_SELECTOR, first_review_card_selector)) - ) - print(f" 口コミコンテナ ('{review_container_selector}') 表示確認、スクロール要素取得。") - time.sleep(WAIT_TIME_BASE * 0.5) # レンダリング待ち - - except TimeoutException: - print(f" 警告: {review_tab_text}タブまたは口コミコンテナの表示タイムアウト。") - except ElementClickInterceptedException: - print(f" 警告: {review_tab_text}タブのクリックが遮られました。") - except NoSuchElementException: - print(f" 警告: {review_tab_text}タブが見つかりません。") - except Exception as e_click_review: - # その他のタブクリック関連エラー - print(f"★★★★★ {review_tab_text}タブ処理中に予期せぬエラー ★★★★★\n{type(e_click_review).__name__}: {e_click_review}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - - # --- 口コミエリアのスクロール処理 --- - if review_scroll_element: - print(" 口コミエリアをスクロールして全件表示試行...") - review_last_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) - review_scroll_attempts = 0 - review_stuck_count = 0 - while review_scroll_attempts < MAX_SCROLL_ATTEMPTS_REVIEW: - try: - # スクロール実行 - driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', review_scroll_element) - time.sleep(SCROLL_PAUSE_TIME_REVIEW) - # 新しい高さを取得 - review_new_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) - - # 高さが変わらなくなったかチェック - if review_new_height == review_last_height: - review_stuck_count += 1 - if review_stuck_count >= REVIEW_SCROLL_STUCK_LIMIT: - print(f" 口コミスクロール高さが{REVIEW_SCROLL_STUCK_LIMIT}回変化なし。スクロール終了と判断。") - break - else: - # 停滞時は少し長く待って再試行 - time.sleep(SCROLL_PAUSE_TIME_REVIEW * 2) - else: - review_stuck_count = 0 # 高さが変わればリセット - - review_last_height = review_new_height - except Exception as e_review_scroll: - # 口コミスクロール中のエラー - print(f"★★★★★ 口コミスクロール中にエラー ★★★★★\n{type(e_review_scroll).__name__}: {e_review_scroll}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - print(" 口コミスクロールエラー発生。可能な範囲で続行します。") - break # スクロールエラー時はループ中断 - review_scroll_attempts += 1 - if review_scroll_attempts >= MAX_SCROLL_ATTEMPTS_REVIEW: - print(f" 最大口コミスクロール回数 ({MAX_SCROLL_ATTEMPTS_REVIEW}) 到達。") - print(" 口コミエリアのスクロール完了。") - elif review_tab_clicked: - # 口コミタブはクリックできたが、スクロール要素が見つからなかった場合 - print(" 警告: 口コミスクロール要素が見つからなかったため、口コミスクロールをスキップします。") - - - # --- 「もっと見る」ボタンをクリック --- - if review_tab_clicked and review_scroll_element: # 口コミタブがクリックされ、スクロール要素が見つかっている場合のみ実行 - print(" 「もっと見る」ボタンを検索してクリック試行...") - more_buttons_xpath = "//button[contains(text(), 'もっと見る')]" # 「もっと見る」ボタンのXPath (要UI確認) - clicked_count = 0 - click_attempts = 0 - max_click_attempts = 3 # 複数回試行(動的追加対策) - while click_attempts < max_click_attempts: - buttons_found_this_round = 0 - try: - # 現在表示されている「もっと見る」ボタンを全て取得 - more_buttons = driver.find_elements(By.XPATH, more_buttons_xpath) - if not more_buttons: - # ボタンが見つからない場合 - if click_attempts == 0: print(" 「もっと見る」ボタンが見つか��ませんでした。") - else: print(f" 追加の「もっと見る」ボタンは見つかりませんでした (試行 {click_attempts+1}/{max_click_attempts})。") - break # ループ終了 - - print(f" 「もっと見る」ボタンを {len(more_buttons)} 個発見 (試行 {click_attempts+1}/{max_click_attempts})。クリック開始...") - # 見つかったボタンを順番にクリック - for btn_idx, button in enumerate(more_buttons): - try: - # ボタンが表示されていてクリック可能か確認 - if button.is_displayed() and button.is_enabled(): - # JavaScriptでクリック (クリック妨害対策) - driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", button) # ボタンを画面中央へ - time.sleep(0.2) # スクロール待機 - driver.execute_script("arguments[0].click();", button) - clicked_count += 1 - buttons_found_this_round += 1 - time.sleep(0.3) # クリック後のDOM変化/読み込み待ち - except ElementClickInterceptedException: - # クリックが遮られた場合 - print(f" ボタン {btn_idx+1} のクリックが遮られました。スキップします。") - except StaleElementReferenceException: - # 要素が古くなった場合 - print(f" ボタン {btn_idx+1} が古くなりました。スキップします。") - except Exception as e_click_more: - # その他のクリックエラー - print(f" ボタン {btn_idx+1} のクリック中にエラー: {e_click_more}") - print(f" 今回の試行で {buttons_found_this_round} 個の「もっと見る」ボタンをクリックしました。") - # 今回クリックできるボタンがなければ終了 - if buttons_found_this_round == 0: - print(" これ以上クリックできる「もっと見る」ボタンはありませんでした。") - break - - except Exception as e_find_more: - # 「もっと見る」ボタン検索中のエラー - print(f"★★★★★ 「もっと見る」ボタン検索中にエラー ★★★★★\n{type(e_find_more).__name__}: {e_find_more}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - break # 検索エラー時はループ終了 - click_attempts += 1 - # 次の検索まで少し待つ (DOM更新待ち) - if click_attempts < max_click_attempts: - time.sleep(1.0) - - # クリック結果のサマリー - if clicked_count > 0: - print(f" 合計 {clicked_count} 個の「もっと見る」ボタンをクリックしました。") - else: - print(" クリックされた「もっと見る」ボタンはありませんでした。") - # 全てのクリック処理後に少し待つ - time.sleep(WAIT_TIME_BASE * 0.5) - - # --- HTML取得と保存 (「もっと見る」クリック後) --- - print(" ページのHTMLを取得・保存中...") - detail_html_content = "" - try: - # 全ての操作が終わった後のHTMLを取得 - detail_html_content = driver.page_source - # ファイル名生成 - temp_name = 'N/A' - try: temp_name = driver.find_element(By.TAG_NAME, 'h1').text - except: pass # h1が見つからなくてもエラーにしない - safe_place_name_part = re.sub(r'[\\/*?:"<>|]', '_', temp_name)[:20].strip() or "no_name" - tab_suffix = "_reviews_expanded" if review_tab_clicked else "_overview" # ファイル名で状態を示す - detail_html_fname = f"Q{query_index:03d}_R{i:03d}_{safe_place_name_part}_{safe_query_part}_detail{tab_suffix}.html" - detail_html_path = os.path.join(output_dir, detail_html_fname) - # ファイル書き込み - with open(detail_html_path, 'w', encoding='utf-8') as f: - f.write(detail_html_content) - result_details['html_filename'] = detail_html_fname - print(f" HTMLを保存しました: {detail_html_fname}") - except Exception as e_save_html: - # HTML保存エラー - print(f" HTML取得/保存エラー: {e_save_html}") - result_details['html_filename'] = 'Error Saving HTML' - - # --- HTML解析 (本文抽出ロジックは関数側で変更済み) --- - if detail_html_content: - print(" HTMLを解析して情報を抽出中...") - extracted_info = extract_details_and_reviews_from_html(detail_html_content) - # 抽出結果をマージ - result_details.update(extracted_info) - # ステータス設定 - if result_details.get('extraction_error'): - result_details['status'] = f"Warning: HTML Extraction Error" # 抽出エラーは警告 + log_output_list.append(f"一覧ページ {page_num} で会社が見つかりませんでした。最終ページのようです。") + break + + log_output_list.append(f"一覧ページ {page_num} で {len(companies_on_page)} 件検出。詳細情報取得...") + print(f"一覧ページ {page_num} で {len(companies_on_page)} 件検出。詳細情報取得...") + + # --- 6. 会社詳細ループ (ページ内の会社ごと) --- + desc = f"ページ {page_num}: 詳細取得&保存中" + # tqdm自体のキャンセルは難しいので、ループ内でチェック + for i, company_base_info in enumerate(companies_on_page): + # --- 停止チェック (各会社の処理開始前) --- + if stop_requested: + log_output_list.append("\n===== 停止リクエストにより現在のページの処理を中断します =====") + break # このページの残りの会社の処理を中断 + + total_companies_processed += 1 + company_log = f"\n--- 会社 #{total_companies_processed}: {company_base_info.get('会社名', '名前不明')} ---" + print(company_log) # ログリストには追加しない + + detail_url = company_base_info.get("Pitact 詳細URL") + + if detail_url and detail_url != "取得失敗": + detail_html, detail_fetch_log = get_html_with_driver(driver, detail_url, log_output_list, wait_time=3) + + # --- 停止チェック (詳細HTML取得後) --- + if stop_requested: break + + if detail_html: + extracted_details, detail_parse_log = parse_detail_page(detail_html, log_output_list) + # --- 停止チェック (詳細解析後) --- + if stop_requested: break + + if "住所" in extracted_details and extracted_details["住所"] != "取得失敗": + company_base_info["住所"] = extracted_details["住所"] + if "ホームページURL" in extracted_details: + company_base_info["ホームページURL"] = extracted_details["ホームページURL"] + if "設立日" in extracted_details: + company_base_info["設立日"] = extracted_details["設立日"] else: - result_details['status'] = 'Success' - print(" HTML解析完了。") - else: - # HTMLが取得できなかった場合 - print(" エラー: HTMLコンテンツが空のため、情報抽出をスキップします。") - result_details['status'] = 'Error: Empty HTML Content' - - # 詳細ページ処理中の各種例外ハンドリング - except TimeoutException as e_timeout_detail: - print(f"★★★★★ 詳細ページ読み込みタイムアウト ★★★★★\nURL: {place_url}\n{e_timeout_detail}") - # ★★★ SyntaxError修正 ★★★ - print("--- HTML Snapshot (Timeout) ---") - try: - print(driver.page_source[:1000]) - except Exception as e_snapshot: - print(f" ページソース取得失敗: {e_snapshot}") - print("--- End Snapshot ---") - result_details['status'] = f'Error: Detail Page Timeout'; result_details['name'] = f"Error (Timeout R:{i})" - except NoSuchElementException as e_nse: - # 主にh1が見つからない��ースなど - print(f"★★★★★ 詳細ページで必須要素(h1など)が見つかりません ★★★★★\nURL: {place_url}\n{e_nse}") - # ★★★ SyntaxError修正 ★★★ - print("--- HTML Snapshot (NSE) ---") - try: - print(driver.page_source[:1000]) - except Exception as e_snapshot: - print(f" ページソース取得失敗: {e_snapshot}") - print("--- End Snapshot ---") - result_details['status'] = f'Error: Detail Page Missing Element (e.g., h1)'; result_details['name'] = f"Error (ElementNotFound R:{i})" - except Exception as e_detail: - # その他の予期せぬエラー - print(f"★★★★★ 詳細ページ処理中に予期せぬエラー ★★★★★\nURL: {place_url}\n{type(e_detail).__name__}: {e_detail}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - result_details['status'] = f'Error: Detail Page Exception - {type(e_detail).__name__}'; result_details['name'] = f"Error (Exception R:{i})" - finally: - # 処理結果をリストに追加 - results_list.append(result_details) - - except Exception as e_main_query: - # クエリ単位での大きなエラー - print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理全体でエラー ★★★★★\n{type(e_main_query).__name__}: {e_main_query}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") - results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Error (Overall Query {query_index})', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Query Level Exception - {type(e_main_query).__name__}'}) - finally: - # クエリ処理終了ログ - print(f"--- クエリ処理終了 [Index:{query_index}] - {len(results_list)} 件の結果 ---") - return results_list - - -# --- Gradio Processing Function --- -def run_scraping(input_csv_file, output_dir_name, output_csv_name, csv_encoding, - wait_time_base, wait_time_detail, wait_time_search, headless_mode): - """Gradioインターフェースから呼び出されるメイン処理関数""" - log_stream = io.StringIO() # ログ出力用 - start_time_total = time.time() # 全体処理時間計測開始 - driver = None # WebDriverオブジェクト初期化 - processed_query_count = 0 # 処理済みクエリ数 - total_results_count = 0 # CSV書き込み総行数 - total_queries = 0 # 総クエリ数 - output_csv_path = None # 出力CSVファイルパス - - # 標準出力と標準エラー出力をログストリームにリダイレクト - with contextlib.redirect_stdout(log_stream), contextlib.redirect_stderr(log_stream): - try: - print("=== 処理開始 ===") - print(f"開始時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - # 入力ファイルチェック - if input_csv_file is None: - print("エラー: CSVファイルが選択されていません。処理を中断します。") - yield log_stream.getvalue(), None # ログと空の結果を返す - return - yield log_stream.getvalue(), None # 初期ログをUIに反映 - - # パラメータ設定 - SEARCH_QUERIES_CSV_PATH = input_csv_file.name - OUTPUT_DIR = output_dir_name.strip() or "html_reviews_expanded" # HTML保存ディレクトリ - OUTPUT_CSV_FILENAME = output_csv_name.strip() or "結果_reviews_expanded.csv" # 出力CSV名 - CSV_ENCODING = csv_encoding # CSVエンコーディング - # 待機時間設定 (不正値チェック含む) - try: - wait_config = { - 'base': max(1.0, float(wait_time_base)), - 'detail': max(10.0, float(wait_time_detail)), - 'search': max(5.0, float(wait_time_search)) - } - except ValueError: - print("警告: 待機時間に無効な値が入力されました。デフォルト値を使用します。") - wait_config = {'base': 4.0, 'detail': 20.0, 'search': 15.0} - print(f"待機時間設定: 基本={wait_config['base']}秒, 詳細/口コミ={wait_config['detail']}秒, 検索={wait_config['search']}秒") - yield log_stream.getvalue(), None - - # 出力ディレクトリ設定と作成 - if not os.path.isabs(OUTPUT_DIR): # 相対パスなら絶対パスに変換 - OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR) - output_csv_path = os.path.join(OUTPUT_DIR, OUTPUT_CSV_FILENAME) - print(f"HTML出力先ディレクトリ: {OUTPUT_DIR}") - print(f"CSV出力先ファイル: {output_csv_path}") - os.makedirs(OUTPUT_DIR, exist_ok=True) # ディレクトリが存在しなければ作成 - yield log_stream.getvalue(), None - - # CSVからクエリ読み込み - queries = load_queries(SEARCH_QUERIES_CSV_PATH) - yield log_stream.getvalue(), None # ログ更新 - if not queries: - print("エラー: CSVから処理可能なクエリが見つかりませんでした。処理を終了します。") - yield log_stream.getvalue(), None - return - total_queries = len(queries) - print(f"{total_queries} 件のクエリを処理します。") - yield log_stream.getvalue(), None - - # WebDriver初期化 - print("\nWebDriver初期化中...") - yield log_stream.getvalue(), None - options = Options() - options.add_argument('--no-sandbox') - options.add_argument('--disable-dev-shm-usage') - options.add_argument('--lang=ja-JP') # 日本語指定 - options.add_argument("--window-size=1920,1080") # ウィンドウサイズ - options.add_argument('--disable-extensions') # 拡張機能無効化 - options.add_argument('--disable-blink-features=AutomationControlled') # 自動化検出回避策 - options.add_argument('--disable-gpu') # GPU無効化(安定性向上目的) - # 自動化フラグを隠すオプション - options.add_experimental_option('excludeSwitches', ['enable-automation']) - options.add_experimental_option('useAutomationExtension', False) - # パスワード保存ダイアログなどを無効化 - options.add_experimental_option("prefs", { - "credentials_enable_service": False, - "profile.password_manager_enabled": False - }) - # ヘッドレスモード設定 - if headless_mode: - print(" ヘッドレスモードで実行します。") - options.add_argument('--headless=new') # 新しいヘッドレスモード - else: - print(" 通常モード (非ヘッドレス) で実行します。") + log_output_list.append(f"警告: {company_base_info.get('会社名', '不明')} 詳細ページ取得失敗。") + company_base_info.setdefault("ホームページURL", "取得失敗") + company_base_info.setdefault("設立日", "取得失敗") - # WebDriverインスタンス生成 - try: - if IN_COLAB and gs: - # Colab環境 - print(" Colab環境でgoogle_colab_seleniumを使用します。") - driver = gs.Chrome(options=options) - elif not IN_COLAB and ChromeService and ChromeDriverManager: - # ローカル環境 (webdriver-manager使用) - try: - print(" webdriver-managerを使用してChromeDriverパスを解決します...") - service = ChromeService(ChromeDriverManager().install()) - driver = webdriver.Chrome(service=service, options=options) - print(" ChromeDriver (webdriver-manager) 起動成功。") - except Exception as e_wdm: - # webdriver-manager失敗時のフォールバック - print(f" webdriver-managerでの初期化エラー: {e_wdm}") - print(" PATH上のChromeDriverで試行します...") - driver = webdriver.Chrome(options=options) # PATH上のドライバで再試行 - print(" ChromeDriver (PATH) 起動成功。") - elif not IN_COLAB: - # ローカル環境 (webdriver-managerなし、PATH上のドライバ使用) - print(" PATH上のChromeDriverを使用します...") - driver = webdriver.Chrome(options=options) - print(" ChromeDriver (PATH) 起動成功。") + time.sleep(0.4) else: - # 上記いずれでもない場合 (通常は発生しない) - raise Exception("WebDriverを初期化できませんでした。適切なWebDriver設定が見つかりません。") - - # 暗黙的な待機 (要素が見つかるまで指定秒数待つ) - driver.implicitly_wait(3) - print("WebDriver初期化完了。") - except Exception as e_wd_init: - # WebDriver初期化失敗時のエラー処理 - print(f"★★★★★ WebDriver初期化失敗 ★★★★★") - print(f"エラータイプ: {type(e_wd_init).__name__}") - print(f"エラーメッセージ: {e_wd_init}") - print("--- スタックトレース ---") - print(traceback.format_exc()) - print("----------------------") - print("ヒント: ChromeDriverのバージョンとChromeブラウザのバージョンが一致しているか確認してください。") - if not IN_COLAB: print(" `webdriver-manager`がインストールされていない場合は `pip install webdriver-manager` を試してください。") - yield log_stream.getvalue(), None # ログと空の結果を返す - return # WebDriverがないと処理継続不可 - yield log_stream.getvalue(), None - - # CSVヘッダー定義 (「もっと見る」クリック版) - csv_header = ['QueryIndex', 'OriginalQuery', 'ResultRank', 'Status', 'ExtractedName', - 'ExtractedWebsite', 'ExtractedPhone', 'ExtractedAddress', 'Reviews', # 口コミ列 - 'ExtractionError', # HTML抽出エラー列 - 'PlaceURL', 'DetailHTMLFilename'] - # CSVファイルモード設定 (追記 'a' or 新規 'w') - file_exists = os.path.exists(output_csv_path) - file_mode = 'a' if file_exists and os.path.getsize(output_csv_path) > 0 else 'w' - print(f"CSVファイルを '{file_mode}' モードで開きます (エンコーディング: {CSV_ENCODING})。") - yield log_stream.getvalue(), None - - # CSVファイル処理 (メインループ) + log_output_list.append(f"警告: {company_base_info.get('会社名', '不明')} 詳細URL不明。") + company_base_info.setdefault("ホームページURL", "取得失敗") + company_base_info.setdefault("設立日", "取得失敗") + + # --- 停止チェック (CSV保存前) --- + if stop_requested: break + + # --- 7. ローカルCSVへ追記 --- + success_csv, csv_log = append_to_csv(company_base_info, local_output_path, CSV_HEADERS, log_output_list) + + # --- 8. Google Driveへ同期 --- + if success_csv and gdrive_output_path: + # --- 停止チェック (Drive同期前) --- + if stop_requested: break + success_sync, sync_log = sync_file_to_drive(local_output_path, gdrive_output_path, log_output_list) + # 同期失敗時のログは sync_file_to_drive 内で追加される + + # メモリ上のリストにも追加 + all_company_data_memory.append(company_base_info) + + # 内側のループがbreakされた場合、外側のループもbreakする + if stop_requested: + break + + page_num += 1 + time.sleep(0.8) + + except Exception as e: + error_detail = traceback.format_exc() + log_output_list.append("\n!!!処理中に予期せぬエラーが発生しました!!!") + log_output_list.append(f"エラータイプ: {type(e).__name__}") + log_output_list.append(f"エラーメッセージ: {e}") + log_output_list.append("詳細:") + log_output_list.append(error_detail) + print("!!!処理中に予期せぬエラーが発生しました!!!") + print(error_detail) + finally: + if driver: try: - # CSVファイルを指定モード・エンコーディングで開く - with open(output_csv_path, file_mode, newline='', encoding=CSV_ENCODING, errors='replace') as csv_file: - writer = csv.writer(csv_file) - # 新規ファイルの場合、ヘッダー行を書き込む - if file_mode == 'w': - print(" 新規CSVファイルのためヘッダー行を書き込みます。") - writer.writerow(csv_header) - csv_file.flush() # ヘッダー書き込み後すぐにフラッシュ - elif file_exists: - print(f" 既存ファイル '{os.path.basename(output_csv_path)}' に追記します。") - - # クエリごとに処理を実行 - for i, query in enumerate(queries, 1): - start_time_query = time.time() # クエリ処理時間計測開始 - print(f"\n===== クエリ {i}/{total_queries} 開始: '{query}' =====") - yield log_stream.getvalue(), None # 各クエリ開始前にログ更新 - - # --- 単一クエリのスクレイピング処理実行 --- - results = process_single_query_full_list(driver, query, i, OUTPUT_DIR, wait_config) - yield log_stream.getvalue(), None # クエリ処理後���ログ更新 - - # --- 取得結果をCSVに書き込み --- - written_count_query = 0 - print(f" クエリ {i} の結果をCSVに書き込み中...") - for result_data in results: - try: - # レビューデータを整形して文字列にする - reviews_list = result_data.get('reviews', []) - formatted_reviews = "" - if isinstance(reviews_list, list) and reviews_list: - review_texts = [] - for idx, review_item in enumerate(reviews_list): - # 各レビューをフォーマット - if isinstance(review_item, dict): - r_text = str(review_item.get('text', '')).replace('\n', ' ').replace('\r', '') # 改行をスペースに - review_texts.append( - f"[{idx+1}] 投稿者: {review_item.get('reviewer', 'N/A')} | " - f"評価: {review_item.get('rating', 'N/A')} | " - f"本文: {r_text}" - ) - elif isinstance(review_item, str): # 予期せぬ形式の場合 - review_texts.append(f"[{idx+1}] {review_item.replace('n', ' ').replace('r', '')}") - # 各レビューを改行2つで結合 - formatted_reviews = "\n\n".join(review_texts) - elif isinstance(reviews_list, str): # 既に文字列の場合 - formatted_reviews = reviews_list.replace('\n', ' ').replace('\r', '') - - # HTML抽出エラーメッセージを取得 (長すぎる場合は短縮) - extraction_error_msg = result_data.get('extraction_error', '') - if extraction_error_msg and len(extraction_error_msg) > 500: - extraction_error_msg = extraction_error_msg[:250] + "..." + extraction_error_msg[-250:] - - # CSVに書き込む行データを作成 (ヘッダー順に合わせる) - row_data = [ - result_data.get('query_index', i), - result_data.get('original_query', query), - result_data.get('result_rank', 'N/A'), - result_data.get('status', 'Unknown'), - result_data.get('name', 'N/A'), - result_data.get('url', ''), - result_data.get('phone', 'N/A'), - result_data.get('address', 'N/A'), - formatted_reviews, # 整形済みレビュー - extraction_error_msg, # HTML抽出エラー - result_data.get('place_url', 'N/A'), - result_data.get('html_filename', 'N/A') - ] - # CSVファイルに書き込み - writer.writerow(row_data) - written_count_query += 1 - except Exception as e_write: - # CSV書き込み中のエラー処理 - print(f"★★★★★ CSV書き込み中にエラーが発生しました (行スキップ) ★★★★★") - print(f"エラーデータ (一部): {str(result_data)[:200]}...") # エラー原因データを一部表示 - print(f"エラータイプ: {type(e_write).__name__}: {e_write}") - - csv_file.flush() # 各クエリ処理後にファイルに書き出す - total_results_count += written_count_query - processed_query_count += 1 - end_time_query = time.time() # クエリ処理時間計測終了 - print(f"===== クエリ {i}/{total_queries} 完了 - {written_count_query}件書き込み, 所要時間: {end_time_query - start_time_query:.2f} 秒 =====") - yield log_stream.getvalue(), None # 各クエリ完了後にログ更新 - - # --- クエリ間の待機 --- - if i < total_queries: # 最後のクエリの後には待機しない - # ランダム性を持たせた待機時間 - sleep_duration = wait_config['base'] * 1.5 + (hash(query + str(i)) % (wait_config['base'] * 1.5)) - sleep_duration = max(wait_config['base'] * 0.8, min(sleep_duration, wait_config['base'] * 4.0)) - print(f"次のクエリまで {sleep_duration:.2f} 秒待機します...") - yield log_stream.getvalue(), None - time.sleep(sleep_duration) - else: - print("\n全クエリの処理が完了しました。") - - except IOError as e_io: - # CSVファイルオープン/書き込み時のIOエラー - print(f"★★★★★ CSVファイル '{output_csv_path}' のオープン/書き込み中にIOエラー ★★★★★") - print(f"エラータイプ: {type(e_io).__name__}") - print(f"エラーメッセージ: {e_io}") - print("--- スタックトレース ---") - print(traceback.format_exc()) - print("----------------------") - print("ファイルが他のプログラムで開かれていないか、書き込み権限があるか確認してください。") - output_csv_path = None # 結果ファイルパスを無効化 - except Exception as e_csv_loop: - # CSV処理ループ中の予期せぬエラー - print(f"★★★★★ CSV処理ループ中に予期せぬエラー ★★★★★") - print(f"エラータイプ: {type(e_csv_loop).__name__}") - print(f"エラーメッセージ: {e_csv_loop}") - print("--- スタックトレース ---") - print(traceback.format_exc()) - print("----------------------") - - - except Exception as e_main: - # run_scraping関数全体での予期せぬエラー - print(f"\n★★★★★ メイン処理 (run_scraping) 中に予期せぬエラーが発生しました ★★★★★") - print(f"エラータイプ: {type(e_main).__name__}") - print(f"エラーメッセージ: {e_main}") - print("\n--- スタックトレース ---") - print(traceback.format_exc()) # スタックトレースをログに出力 - print("----------------------") - # エラー発生時も、可能な限りログと途中までのCSVを返す - yield log_stream.getvalue(), output_csv_path if output_csv_path and os.path.exists(output_csv_path) else None - return # 処理終了 - - finally: - # --- 終了処理 --- - if driver: - print("\nWebDriver終了処理中...") - try: - driver.quit() # ブラウザを閉じる - print("WebDriver正常終了。") - except Exception as e_quit: - # WebDriver終了時のエラー - print(f"★★★★★ WebDriver終了時にエラー ★★★★★") - print(f"エラータイプ: {type(e_quit).__name__}: {e_quit}") - - # 全体処理時間計測終了と結果表示 - end_time_total = time.time() - total_duration_seconds = end_time_total - start_time_total - print(f"\n=== 全処理終了 ===") - print(f"終了時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print(f"処理完了クエリ数: {processed_query_count}/{total_queries if total_queries > 0 else 'N/A'} 件") - print(f"CSV書き込み総行数: {total_results_count} 件") - print(f"総処理時間: {total_duration_seconds:.2f} 秒 ({total_duration_seconds/60:.2f} 分)") - - # 最終ログを取得 - final_log = log_stream.getvalue() - - # 結果CSVファイルの存在確認とGradioへの出力設定 - if output_csv_path and os.path.exists(output_csv_path) and os.path.getsize(output_csv_path) > 0: - print(f"結果CSVファイル: {output_csv_path}") - # GradioのFileコンポーネントにファイルパスを渡してダウンロード可能にする - yield final_log, gr.File(value=output_csv_path, label="結果CSVダウンロード") - elif output_csv_path: - # ファイルパスはあるが、空か存在しない場合 - print(f"警告: 結果CSVファイル '{output_csv_path}' は空または存在しません。") - yield final_log, None + driver.quit() + log_output_list.append("\n===== WebDriverを終了しました =====") + print("\n===== WebDriverを終了しました =====") + except Exception as e_quit: + log_output_list.append(f"WebDriver終了時にエラーが発生しました: {e_quit}") + print(f"WebDriver終了時にエラーが発生しました: {e_quit}") + + # --- 9. 最終処理と結果返却 --- + if stop_requested: + log_output_list.append("===== 処理はユーザーリクエストにより中断されました =====") + else: + log_output_list.append("\n===== 処理完了 =====") + + final_df = pd.DataFrame() + local_file_ref = None + drive_file_ref = None # DriveファイルはGradio Fileで返さない + + if all_company_data_memory: + processed_count = len(all_company_data_memory) + log_output_list.append(f"合計 {processed_count} 件の会社データを収集し、") + log_output_list.append(f"ローカル '{os.path.basename(local_output_path)}' に保存しました。") + if gdrive_output_path and os.path.exists(gdrive_output_path): + log_output_list.append(f"Google Drive '{gdrive_output_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}' にも同期されました。") + elif gdrive_output_path: + log_output_list.append(f"Google Drive '{os.path.basename(gdrive_output_path)}' への最終同期に失敗または中断された可能性があります。") + + df = pd.DataFrame(all_company_data_memory) + display_columns = ["会社名", "住所", "ホームページURL", "設立日"] + final_df = pd.DataFrame(columns=display_columns) + for col in display_columns: + if col in df.columns: + final_df[col] = df[col] else: - # ファイルパス自体がない場合 (IOErrorなど) - print("結果CSVファイルは生成されませんでした。") - yield final_log, None - + final_df[col] = "取得失敗" + + print(f"合計 {processed_count} 件の会社データを収集完了。") + + if os.path.exists(local_output_path): + local_file_ref = local_output_path + if gdrive_output_path and os.path.exists(gdrive_output_path): + log_output_list.append(f"DriveファイルはGoogle Driveから直接ダウンロードしてください: {gdrive_output_path.replace(DRIVE_MOUNT_POINT, 'MyDrive:/', 1)}") + # drive_file_ref は None のまま + + elif os.path.exists(local_output_path): + log_output_list.append(f"会社データは収集されませんでしたが、ローカルファイル '{os.path.basename(local_output_path)}' は作成/初期化されました。") + print(f"会社データは収集されませんでしたが、ローカルファイル '{os.path.basename(local_output_path)}' は作成/初期化されました。") + local_file_ref = local_output_path + else: + log_output_list.append("会社データは収集されず、ローカルファイルも作成されませんでした。") + print("会社データは収集されず、ローカルファイルも作成されませんでした。") + + final_log_output = "\n".join(log_output_list) + # 戻り値: DataFrame, ローカルファイルパス(or None), Driveファイルパス(常にNone), ログ文字列 + return final_df, local_file_ref, None, final_log_output + +# --- CSVダウンロード用関数 (変更なし) --- +def download_local_csv(): + """ + ローカルの一時CSVファイルが存在すれば、そのパスを返す。 + """ + if os.path.exists(LOCAL_CSV_FILENAME): + return LOCAL_CSV_FILENAME + else: + print("ローカルCSVファイルが見つかりません。") + return None -# --- Gradio UI 定義 --- +# --- Gradio Interface (停止ボタン追加) --- with gr.Blocks(theme=gr.themes.Soft()) as demo: - gr.Markdown("# Google Maps スクレイピング (口コミ全件表示試行)") # タイトル gr.Markdown( """ - CSVクエリで検索し、詳細ページで「クチコミ」タブをクリック後、口コミエリアを**最後までスクロール**し、 - さらに**「もっと見る」ボタンを全てクリック**して全件表示を試みます。 - その後、基本情報と口コミ情報(`span.wiI7pd`優先)を抽出し、CSVに出力します。 - **処理フロー:** - 1. クエリ検索 → リストスクロール → リンク抽出。 - 2. 詳細ページ遷移 → **「クチコミ」タブクリック** → 口コミコンテナ待機。 - 3. **口コミエリアを最後までスクロール**。 - 4. **「もっと見る」ボタンを全てクリック** (複数回試行)。 - 5. HTML取得 → **bs4**で解析 (基本情報: `.aIFcqe`優先, 口コミ本文: `span.wiI7pd`優先)。 - 6. 結果をCSVに出力(HTMLも保存)。 - - **注意:** 「もっと見る」ボタンの挙動やセレクタは変わりやすいです。エラーログを確認し、コード修正が必要になる場��があります。 + # Pitact 企業情報 スクレイピング ツール (Drive逐次同期 & 停止機能付き) + 指定されたPitactの一覧ページURLから企業情報を収集し、ローカルおよびGoogle Drive上のCSVに**1件ずつ**保存します。 + **Colab環境での実行を推奨します。** Drive同期機能はColabでのみ有効です。 + 処理中に「処理停止」ボタンを押すと、現在の処理がキリの良いところで中断されます。 """ - ) # 説明文 - + ) with gr.Row(): + with gr.Column(scale=3): + url_input = gr.Textbox( + label="Pitact 市区町村別一覧 URL", + placeholder="例: https://pitact.com/search/pref-hokkaido/city-1101", + value="https://pitact.com/search/pref-hokkaido/city-1101" + ) + drive_filename_input = gr.Textbox( + label="Google Drive 保存ファイル名 (Colab環境のみ有効)", + placeholder="例: Pitact_結果.csv (MyDrive直下に保存)", + value="Pitact_Scraping_Result.csv" + ) + with gr.Row(): + start_button = gr.Button("スクレイピング開始", variant="primary", scale=2) + stop_button = gr.Button("処理停止", variant="stop", scale=1) # 停止ボタンを追加 with gr.Column(scale=2): - gr.Markdown("### ① 入力ファイルと出力設定") - input_csv_file = gr.File(label="検索クエリCSVファイル (1列目のみ使用)", file_types=[".csv"]) - output_dir_name = gr.Textbox(label="HTML保存先ディレクトリ名", value="html_reviews_expanded") - output_csv_name = gr.Textbox(label="出力CSVファイル名", value="結果_reviews_expanded.csv") - csv_encoding = gr.Dropdown(label="出力CSVエンコーディング", choices=['utf-8-sig', 'cp932'], value='utf-8-sig') - headless_mode = gr.Checkbox(label="ヘッドレスモードで実行 (エラー発生時はOFF推奨)", value=True) - with gr.Column(scale=1): - gr.Markdown("### ② 待機時間設定 (秒)") - wait_time_base = gr.Number(label="基本待機", minimum=1, maximum=20, step=0.5, value=4) - wait_time_detail = gr.Number(label="詳細/口コミ最大待機", minimum=10, maximum=60, step=1, value=25) - wait_time_search = gr.Number(label="検索リスト最大待機", minimum=5, maximum=60, step=1, value=15) - - start_button = gr.Button("処理開始", variant="primary", size="lg") - gr.Markdown("### ③ 処理ステータスとエラーログ") - # ログ表示エリアの設定 (行数、スクロール) - status_textbox = gr.Textbox(label="ログ", lines=25, interactive=False, autoscroll=True, max_lines=2000) - gr.Markdown("### ④ 結果ダウンロード") - # 結果ファイルダウンロード用コンポーネント - output_csv_download = gr.File(label="結果CSVダウンロード", interactive=False) - - # ボタンクリック時の動作設定 - start_button.click( - fn=run_scraping, # 実行する関数 - inputs=[input_csv_file, output_dir_name, output_csv_name, csv_encoding, - wait_time_base, wait_time_detail, wait_time_search, headless_mode], # 入力コンポーネント - outputs=[status_textbox, output_csv_download] # 出力コンポーネント + log_output_display = gr.Textbox( + label="処理ログ", + lines=20, + interactive=False, + autoscroll=True + ) + # 停止ボタンのフィードバック用 (非表示でも良い) + stop_feedback = gr.Textbox(label="停止ステータス", interactive=False, visible=True) + + + gr.Markdown("---") + gr.Markdown("## 収集結果プレビュー") + dataframe_output = gr.DataFrame( + label="収集データ(会社名, 住所, HP, 設立日) - 最大100件程度表示", + wrap=True, + max_rows=100 + ) + gr.Markdown("---") + gr.Markdown("## ダウンロード") + with gr.Row(): + download_button_local = gr.Button("ローカル保存CSVをダウンロード") + download_file_output_local = gr.File(label="ローカルCSVファイル") + + # 開始ボタンのイベント + start_event = start_button.click( + fn=scrape_pitact_gradio, + inputs=[url_input, drive_filename_input], + # 戻り値: DataFrame, local_file_path, None, log_string + outputs=[dataframe_output, download_file_output_local, gr.Textbox(visible=False), log_output_display] + ) + + # 停止ボタンのイベント + stop_button.click( + fn=request_stop, + inputs=[], + outputs=[stop_feedback], # 停止ステータス表示用のTextboxに出力 + cancels=[start_event] # 開始イベントをキャンセルしよ���と試みる (効果は限定的かも) + ) + + + # ダウンロードボタンのイベント + download_button_local.click( + fn=download_local_csv, + inputs=[], + outputs=[download_file_output_local] + ) + gr.Markdown( + """ + **注意:** + - 「処理停止」ボタンを押しても、現在実行中のネットワークアクセスやファイル書き込みが完了するまで、停止に時間がかかる場合があります。 + - Google Driveへの保存は、処理中に1件ずつ行われます。完了時にDrive上のファイルが最新の状態になります。 + - **Google Drive上のファイルは、この画面から直接ダウンロードできません。** Google Driveのウェブサイトやアプリからアクセスしてください。 + - 長時間実行すると、タイムアウトやリソース制限により中断される可能性があります。 + - スクレイピングは対象サイトの利用規約を確認し、負荷をかけすぎないよう注意して実行してください。 + """ ) -# --- UI起動 --- -print("Gradio UIを起動します...") -# queue()で複数ユーザー対応、share=IN_COLABでColab環境なら共有リンク生成 -demo.queue().launch(share=True, debug=False) \ No newline at end of file +# Gradioアプリの起動 +if __name__ == "__main__": + demo.launch(debug=True, share=True)#test_main \ No newline at end of file