Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import csv | |
import json | |
import time | |
from bs4 import BeautifulSoup | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, ElementClickInterceptedException | |
import re | |
from urllib.parse import urlparse, urljoin | |
import traceback | |
import io | |
import contextlib | |
from datetime import datetime | |
import threading # スレッド中断のために追加 | |
# --- WebDriverの選択 --- | |
IN_COLAB = 'google.colab' in str(get_ipython()) if 'get_ipython' in globals() else False | |
if IN_COLAB: | |
print("Google Colab環境を検出。google_colab_selenium を使用します。") | |
try: import google_colab_selenium as gs | |
except ImportError: print("google_colab_seleniumが見つかりません。!pip install google-colab-selenium を実行してください。"); gs = None | |
else: | |
print("ローカル環境を検出。通常の selenium webdriver を使用します。") | |
from selenium import webdriver | |
gs = None | |
try: | |
from selenium.webdriver.chrome.service import Service as ChromeService | |
from webdriver_manager.chrome import ChromeDriverManager | |
except ImportError: | |
print("webdriver-manager が見つかりません。 `pip install webdriver-manager` を実行してください。") | |
ChromeService = None | |
ChromeDriverManager = None | |
# --- 中断フラグ --- | |
# スレッドセーフな中断イベントを使用 | |
interrupt_event = threading.Event() | |
# --- Helper Functions --- | |
def find_prefixed_data_string(data_structure): | |
"""データ構造内から ")]}'\n" で始まる文字列を見つける(再帰的検索)""" | |
if isinstance(data_structure, str) and data_structure.startswith(")]}'\n"): | |
return data_structure | |
elif isinstance(data_structure, list): | |
for item in data_structure: | |
if interrupt_event.is_set(): return None # 中断チェック | |
found = find_prefixed_data_string(item) | |
if found: | |
return found | |
elif isinstance(data_structure, dict): | |
for value in data_structure.values(): | |
if interrupt_event.is_set(): return None # 中断チェック | |
found = find_prefixed_data_string(value) | |
if found: | |
return found | |
return None | |
def find_details_data_by_id_or_heuristic(data_list, place_id=None): | |
""" | |
JSONデータリストから詳細情報を含む可能性のあるリストを特定する。 | |
place_idがあればそれを優先し、なければヒューリスティック(住所形式など)で探す。 | |
""" | |
if not isinstance(data_list, list): | |
return None | |
if interrupt_event.is_set(): return None # 中断チェック | |
potential_candidates = [] | |
for item in data_list: | |
if interrupt_event.is_set(): return None # 中断チェック | |
# 詳細データは通常、要素数が比較的多いリスト形式 | |
if not isinstance(item, list) or len(item) < 30: | |
continue | |
is_candidate = False | |
# place_id が指定されていれば、リスト内にそのIDが含まれるかチェック | |
if place_id and place_id in str(item): | |
is_candidate = True | |
# place_id がない場合は、住所らしき情報が含まれるかヒューリスティックにチェック | |
elif not place_id: | |
has_address_like = any( | |
isinstance(sub, str) and | |
("〒" in sub or | |
any(k in sub for k in ["都", "道", "府", "県", "市", "区", "町", "村", "丁目", "番地", "号"]) or | |
re.search(r'\d+-\d+-\d+', sub)) | |
for sub in item | |
) | |
if has_address_like: | |
is_candidate = True | |
if is_candidate: | |
potential_candidates.append(item) | |
if not potential_candidates: | |
return None | |
# 候補が1つならそれを返す | |
if len(potential_candidates) == 1: | |
return potential_candidates[0] | |
# 候補が複数ある場合、スコアリングで最もそれらしいものを選ぶ | |
best_candidate = None | |
max_score = -1 | |
for candidate in potential_candidates: | |
if interrupt_event.is_set(): return None # 中断チェック | |
score = len(candidate) # 要素数が多いほど詳細情報の可能性が高い | |
try: | |
# 特定のインデックスにリストが存在するか(構造的な特徴) | |
if any(isinstance(candidate[idx], list) and candidate[idx] for idx in [7, 13, 178] if idx < len(candidate)): | |
score += 50 | |
# URLらしき文字列が含まれるか | |
if 7 < len(candidate) and isinstance(candidate[7], list) and len(candidate[7]) > 0 and isinstance(candidate[7][0], str) and candidate[7][0].startswith('http'): | |
score += 50 | |
# 別の構造的な特徴 | |
if 34 < len(candidate) and isinstance(candidate[34], list) and candidate[34]: | |
score += 30 | |
except Exception: | |
# スコアリング中のエラーは無視 | |
pass | |
if score > max_score: | |
max_score = score | |
best_candidate = candidate | |
return best_candidate | |
def is_domain_like(text): | |
"""文字列がドメイン名らしい形式か簡易的に判定""" | |
if not isinstance(text, str): return False | |
text = text.strip().lower() | |
common_tlds = ['.com', '.jp', '.co.jp', '.net', '.org', '.info', '.biz'] | |
# URLスキーマ、パス、特殊文字、全角文字、IPアドレス形式、前後のドット、連続ドットは除外 | |
if re.search(r'^(https?|ftp)://|[/\\?#\s\u3000-\uFFFF:;@!$%^*()=+]', text): return False | |
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', text): return False | |
if text.startswith('.') or text.endswith('.') or '..' in text: return False | |
# ドットを含み、一般的なTLDで終わるかチェック | |
return '.' in text and any(text.endswith(tld) for tld in common_tlds) | |
def safe_get(data, index, default=None): | |
"""ネストされたリストや辞書から安全に値を取得する""" | |
if isinstance(index, int): | |
try: | |
return data[index] if isinstance(data, list) and index < len(data) else default | |
except IndexError: | |
return default | |
elif isinstance(index, list): # インデックスのリストでネストされた要素を取得 | |
current = data | |
for idx in index: | |
if interrupt_event.is_set(): return default # 中断チェック | |
try: | |
if isinstance(current, list) and isinstance(idx, int) and idx < len(current): | |
current = current[idx] | |
elif isinstance(current, dict) and idx in current: | |
current = current[idx] | |
else: | |
return default # 途中でリスト/辞書でない、またはインデックス/キーが存在しない場合 | |
except (IndexError, KeyError, TypeError): | |
return default # その他の予期せぬエラー | |
return current | |
elif isinstance(index, str): # 文字列インデックスは辞書のキーとして扱う | |
return data.get(index, default) if isinstance(data, dict) else default | |
return default | |
# --- 中断チェック付き時間待機関数 --- | |
def interruptible_sleep(duration): | |
"""指定された時間待機するが、中断イベントが発生したら即座に終了する""" | |
interrupt_event.wait(timeout=duration) | |
# waitはタイムアウトするかイベントがセットされると戻る | |
# 呼び出し元で interrupt_event.is_set() をチェックする必要がある | |
# --- HTML抽出関数 (本文抽出を span.wiI7pd 優先に変更、中断チェック追加) --- | |
def extract_details_and_reviews_from_html(html_content): | |
"""詳細HTMLから基本情報と口コミ情報を抽出 (本文は span.wiI7pd 優先、中断チェックあり)""" | |
print(" [HTML Extractor - Details & Reviews (wiI7pd priority)] 開始") | |
soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') | |
details = {"name": "N/A", "url": "", "phone": "N/A", "address": "N/A", "links": {}, "reviews": [], "extraction_error": None} | |
try: | |
# --- 基本情報の抽出 --- | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
main_container_selector = '.aIFcqe' | |
main_container = soup.select_one(main_container_selector) | |
search_root = soup # デフォルトはページ全体 | |
if main_container: | |
print(f" '{main_container_selector}' コンテナ発見。基本情報を抽出。") | |
search_root = main_container | |
else: | |
print(f" 警告: '{main_container_selector}' コンテナが見つかりません。ページ全体から基本情報を抽出。") | |
# 名前 (h1タグを探す) | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
name_tag = search_root.find('h1') | |
if name_tag: | |
details['name'] = name_tag.get_text(strip=True) | |
elif details['name'] == 'N/A': # フォールバックで<title>から取得 | |
title_tag = soup.find('title') | |
if title_tag and title_tag.string: | |
title_text = title_tag.string.replace('- Google マップ', '').strip() | |
if title_text.lower() != "google マップ": details["name"] = title_text | |
# 電話、住所、ウェブサイトなどの情報を抽出 | |
selectors_map = { | |
"phone": ['button[data-item-id^="phone:tel:"]', 'div.Io6YTe', 'button[aria-label*="電話番号"]'], | |
"address": ['button[data-item-id="address"]', 'div.rogA2c', 'button[aria-label*="住所"]'], | |
"website": ['a[data-item-id="authority"][href^="http"]', 'button[data-item-id="authority"]', 'a[aria-label*="ウェブサイト"][href^="http"]'], | |
"other_link": ['a.CsEnBe[href^="http"]'] # 公式サイト以外のリンク | |
} | |
for info_type, selectors in selectors_map.items(): | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
found_val = None | |
for selector in selectors: | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
element = search_root.select_one(selector) | |
# コンテナ内で見つからなければページ全体で再検索 | |
if not element and search_root != soup: | |
element = soup.select_one(selector) | |
if element: | |
data_item_id = element.get('data-item-id', '') | |
aria_label = element.get('aria-label', '') | |
element_text = element.get_text(strip=True) | |
href = element.get('href') | |
if info_type == "phone": | |
phone_num = None | |
if data_item_id.startswith('phone:tel:'): phone_num = data_item_id.split(':')[-1] | |
elif "電話番号:" in aria_label: phone_num = re.search(r'([\d-]+)', aria_label.split("電話番号:")[-1]) | |
elif element.name == 'div' and re.match(r'^[\d\s-]+$', element_text): phone_num = element_text | |
# 電話番号形式の整形と検証 | |
if isinstance(phone_num, str): phone_num = phone_num.strip() | |
elif hasattr(phone_num, 'group'): phone_num = phone_num.group(1).strip() | |
if phone_num and re.match(r'^[\d-]+$', phone_num.replace('ー','-')): | |
found_val = phone_num.replace('ー','-') | |
break # 電話番号が見つかったらループ脱出 | |
elif info_type == "address": | |
addr_text = None | |
if data_item_id == 'address': addr_text = element_text | |
elif "住所:" in aria_label: addr_text = aria_label.split("住所:")[-1].split('(新しいウィンドウで開きます)')[0].strip() | |
elif element.name == 'div' and ("〒" in element_text or any(k in element_text for k in ["都","道","府","県","市","区","町","村"])): addr_text = element_text | |
# 住所らしき文字列か簡易チェック | |
if addr_text and len(addr_text) > 5: # ある程度の長さがあるか | |
found_val = addr_text | |
break # 住所が見つかったらループ脱出 | |
elif info_type == "website" or info_type == "other_link": | |
if href and href.startswith('http') and 'google.com' not in urlparse(href).netloc: # Google自身のリンクは除外 | |
link_name = "N/A"; is_website = False | |
# リンクの種類を判別 | |
if data_item_id == 'authority' or "ウェブサイト" in aria_label: | |
link_name = element_text if is_domain_like(element_text) else "ウェブサイト" | |
is_website = True | |
elif info_type == "other_link": | |
link_name = f"リンク ({element_text})" if element_text else "外部リンク" | |
elif is_domain_like(element_text): # ドメイン名らしきテキストの場合 | |
link_name = element_text | |
if link_name != "N/A": | |
normalized_url = href.rstrip('/') | |
# 重複を避けて links 辞書に追加 | |
if not any(existing_url.rstrip('/') == normalized_url for existing_url in details["links"].values()): | |
details["links"][link_name] = href | |
# website タイプで見つかったものを優先的にメインURL候補へ (まだ未設定の場合) | |
if is_website and details["url"] == "": | |
details["url"] = href | |
# website タイプならこのセレクタでの探索は終了 | |
if info_type == "website": | |
found_val = href # 見つかったことを示す | |
break # websiteセレクタのループ脱出 | |
# 各タイプの最初の有効な値を details に格納 (other_link は除く) | |
if found_val and info_type in details and info_type != "other_link": | |
details[info_type] = found_val | |
# メインURLがまだ決まっていない場合、links 辞書から探す | |
if details["url"] == "": | |
priority = ["ウェブサイト", "authority"] # 公式サイトらしき名前を優先 | |
found_url_in_links = False | |
for p_word in priority: | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
for name, url in details["links"].items(): | |
if p_word in name.lower(): | |
details["url"] = url | |
found_url_in_links = True | |
break | |
if found_url_in_links: | |
break | |
# それでも見つからなければ、ドメイン名らしきリンク > 最初のリンク | |
if not found_url_in_links: | |
domain_link = next((url for name, url in details["links"].items() if is_domain_like(name)), None) | |
if domain_link: | |
details["url"] = domain_link | |
elif details["links"]: # linksに何かあれば最初のものをURLとする | |
details["url"] = next(iter(details["links"].values())) | |
print(f" 基本情報抽出完了: Name='{details['name']}'") | |
# --- 口コミ情報の抽出 --- | |
print(" 口コミ情報抽出開始 (span.wiI7pd 優先)...") | |
review_container_selector = 'div.GHT2ce.NsCY4' | |
review_container = soup.select_one(review_container_selector) | |
if review_container: | |
print(f" '{review_container_selector}' 口コミコンテナ発見。") | |
# 口コミカードの特定 (jftiEf or MyEned) | |
review_card_selectors = ['div.jftiEf', 'div.MyEned'] | |
review_cards = [] | |
for sel in review_card_selectors: | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
review_cards = review_container.select(sel) | |
if review_cards: | |
print(f" 口コミカードセレクタ '{sel}' で {len(review_cards)} 件発見。") | |
break | |
if not review_cards: | |
print(" 警告: 口コミコンテナ内で口コミカードが見つかりません。") | |
extracted_reviews = [] | |
for card_idx, card in enumerate(review_cards): | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
try: | |
review_text = "N/A"; reviewer_name = "N/A"; rating = "N/A" | |
# 口コミ本文抽出 (span.wiI7pd 優先) | |
text_span_wiI7pd = card.select_one('span.wiI7pd') | |
if text_span_wiI7pd: | |
review_text = text_span_wiI7pd.get_text(strip=True) | |
else: | |
# フォールバック: span[jscontroller="MZnM8e"] | |
full_text_span = card.select_one('span[jscontroller="MZnM8e"]') | |
if full_text_span: | |
review_text = full_text_span.get_text(strip=True) | |
# 投稿者名 (.d4r55) | |
name_el = card.select_one('.d4r55'); | |
if name_el: reviewer_name = name_el.get_text(strip=True) | |
# 評価 (.kvMYJc aria-label) | |
rating_el = card.select_one('.kvMYJc'); | |
if rating_el: | |
aria_label = rating_el.get('aria-label', ''); | |
match = re.search(r'星 (\d+(\.\d+)?)', aria_label) # "星 5.0" などを想定 | |
if match: rating = match.group(1) | |
# 情報が一部でも取れていれば追加 | |
if review_text != "N/A" or reviewer_name != "N/A": | |
extracted_reviews.append({"reviewer": reviewer_name, "rating": rating, "text": review_text if review_text != "N/A" else ""}) | |
except Exception as e_card: | |
print(f" 口コミカード {card_idx+1} の解析中にエラー: {e_card}") | |
extracted_reviews.append({"reviewer": "Error", "rating": "N/A", "text": f"解析エラー: {e_card}"}) | |
details['reviews'] = extracted_reviews | |
print(f" 口コミ抽出完了: {len(details['reviews'])} 件") | |
else: | |
print(f" 警告: '{review_container_selector}' 口コミコンテナが見つかりません。") | |
except InterruptedError as e_interrupt: # 中断エラーをキャッチ | |
print(f" HTML解析処理が中断されました: {e_interrupt}") | |
details['extraction_error'] = "Interrupted" | |
details['status'] = 'Interrupted' # ステータスも中断にする | |
except Exception as e_extract: | |
print(f"★★★★★ HTML抽出処理中にエラーが発生しました ★★★★★") | |
error_trace = traceback.format_exc() | |
print(error_trace) | |
details['extraction_error'] = f"Type: {type(e_extract).__name__}, Msg: {e_extract}\nTrace: {error_trace}" | |
print(f" [HTML Extractor - Details & Reviews (wiI7pd priority)] 完了: Name='{details['name']}'") | |
return details | |
# --- CSV Loading Function (中断チェック追加) --- | |
def load_queries(csv_path): | |
"""CSVファイルを読み込み、1列目のクエリをリストとして返す(中断チェックあり)""" | |
queries = [] | |
encodings_to_try = ['utf-8-sig', 'utf-8', 'cp932', 'shift_jis'] # 試すエンコーディングリスト | |
file_encoding = None | |
print(f"CSVファイル読み込み開始: {os.path.basename(csv_path)}") | |
if not csv_path or not os.path.exists(csv_path): | |
print("エラー: CSVファイルが見つかりません。") | |
return [] | |
# ファイルのエンコーディングを特定 | |
for encoding in encodings_to_try: | |
if interrupt_event.is_set(): print("CSV読み込み中に中断リクエスト検出"); return [] # 中断チェック | |
try: | |
with open(csv_path, 'r', encoding=encoding, errors='strict') as f: | |
f.read(1024) # ファイルの一部を読んでエンコーディングを確認 | |
file_encoding = encoding | |
print(f" エンコーディング '{encoding}' で読み込み試行...") | |
break | |
except (UnicodeDecodeError, LookupError): | |
continue # 次のエンコーディングを試す | |
except Exception as e_enc: | |
print(f" '{encoding}' 試行中に予期せぬエラー: {e_enc}") | |
continue | |
if not file_encoding: | |
print(f"エラー: ファイル '{os.path.basename(csv_path)}' を読み込めるエンコーディングが見つかりません。") | |
return [] | |
line_num = 0 | |
try: | |
with open(csv_path, 'r', encoding=file_encoding, newline='') as f: | |
reader = csv.reader(f) | |
try: | |
if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック | |
header = next(reader) # 最初の行を読み込む | |
line_num += 1 | |
print(f" 1行目 (ヘッダー可能性あり): {header}") | |
except StopIteration: | |
print("情報: CSVファイルが空です。") | |
return [] # ファイルが空なら終了 | |
except InterruptedError as e_interrupt: | |
print(e_interrupt) | |
return [] | |
# 1行目がヘッダーかどうかを判定 (簡易的) | |
header_keywords = ['query', 'search', 'keyword', 'クエリ', '検索', 'キーワード', '店舗', '会社'] | |
first_col_header = header[0].strip().lower() if header else "" | |
is_header = any(hkw in first_col_header for hkw in header_keywords) | |
# 1行目がヘッダーでなく、かつ内容があればクエリとして追加 | |
if not is_header and header and header[0].strip(): | |
queries.append(header[0].strip()) | |
elif is_header: | |
print(" 1行目はヘッダーと判断しスキップします。") | |
# 2行目以降を処理 | |
for row in reader: | |
if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック | |
line_num += 1 | |
# 1列目にデータがあればクエリとして追加 | |
if row and row[0].strip(): | |
queries.append(row[0].strip()) | |
# 1列目が空でも他の列にデータがあれば警告を表示 (スキップ対象) | |
elif any(cell.strip() for cell in row): | |
print(f"警告: 行 {line_num} の1列目が空です: {row}。スキップします。") | |
print(f" CSVから {len(queries)} 件の有効なクエリを抽出しました。") | |
except InterruptedError as e_interrupt: # 中断をキャッチ | |
print(e_interrupt) | |
print(f"中断リクエストにより、{len(queries)} 件のクエリまで読み込みました。") | |
return queries # 途中までのクエリを返す | |
except Exception as e: | |
# CSV処理中のエラーハンドリング | |
print(f"★★★★★ CSVファイル処理中にエラー (行 {line_num}) ★★★★★") | |
print(f"エラータイプ: {type(e).__name__}") | |
print(f"エラーメッセージ: {e}") | |
print("--- スタックトレース ---") | |
print(traceback.format_exc()) | |
print("----------------------") | |
return [] # エラー発生時は空リストを返す | |
return queries | |
# --- Single Query Processing Function (中断チェック強化) --- | |
def process_single_query_full_list(driver, query, query_index, output_dir, wait_config): | |
"""単一クエリ処理: 検索→リストスクロール→リンク抽出→詳細ページ→口コミタブ→口コミスクロール→「もっと見る」クリック→HTML取得→解析 (中断チェックあり)""" | |
print(f"\n--- クエリ処理開始 [Index:{query_index}] ---: {query}") | |
results_list = [] | |
safe_query_part = re.sub(r'[\\/*?:"<>|]', '_', query)[:30].strip() or "empty_query" | |
base_url = "https://www.google.com/maps/" | |
# 待機時間設定 | |
WAIT_TIME_BASE = wait_config['base'] | |
WAIT_TIME_DETAIL = wait_config['detail'] | |
WAIT_TIME_SEARCH = wait_config['search'] | |
# スクロール設定 | |
SCROLL_PAUSE_TIME = max(1.5, WAIT_TIME_BASE * 0.5) | |
MAX_SCROLL_ATTEMPTS = 30 | |
SCROLL_PAUSE_TIME_REVIEW = max(1.0, WAIT_TIME_BASE * 0.3) | |
MAX_SCROLL_ATTEMPTS_REVIEW = 500 # 口コミは多い場合があるので回数を増やす | |
REVIEW_SCROLL_STUCK_LIMIT = 5 # 口コミスクロール停止判定の閾値 | |
try: | |
# --- 中断チェック --- | |
if interrupt_event.is_set(): raise InterruptedError("処理開始前に中断リクエスト") | |
# 1. 検索実行とリスト表示待機 | |
search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" | |
print(f" URLにアクセス: {search_url}") | |
driver.get(search_url) | |
if interrupt_event.is_set(): raise InterruptedError("ページ読み込み後に中断リクエスト") | |
print(f" 検索結果リスト表示待機 (最大{WAIT_TIME_SEARCH}秒)...") | |
list_container_selector = 'div[role="feed"], div[aria-label*="の検索結果"]' | |
try: | |
# WebDriverWait も中断可能にするのは難しいので、ここではそのまま | |
list_container = WebDriverWait(driver, WAIT_TIME_SEARCH).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) | |
) | |
WebDriverWait(driver, 10).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, f'{list_container_selector} a[href*="/maps/place/"]')) | |
) | |
print(" 検索結果リスト表示を確認。") | |
except TimeoutException as e_timeout: | |
print(f" エラー: 検索結果リストの表示タイムアウト。URL: {search_url}\n{e_timeout}") | |
print("--- HTML Snapshot (Timeout) ---") | |
try: print(driver.page_source[:1000]) | |
except: print(" ページソース取得失敗") | |
print("--- End Snapshot ---") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Timeout)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Timeout'}) | |
return results_list | |
except Exception as e_wait: | |
print(f"★★★★★ リスト待機中に予期せぬエラー ★★★★★\nURL: {search_url}\n{type(e_wait).__name__}: {e_wait}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Wait Exception)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Wait Exception'}) | |
return results_list | |
# 2. 検索リストのスクロール | |
print(" 検索リストをスクロールして全結果を表示...") | |
last_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
scroll_attempts = 0 | |
stuck_count = 0 | |
while scroll_attempts < MAX_SCROLL_ATTEMPTS: | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 中断チェック | |
try: | |
driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', list_container) | |
interruptible_sleep(SCROLL_PAUSE_TIME) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック | |
new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
end_markers = driver.find_elements(By.XPATH, "//span[contains(text(), '結果は以上です')] | //p[contains(text(), '結果は以上です')]") | |
if any(el.is_displayed() for el in end_markers): | |
print(" 「結果は以上です」表示確認。検索リストスクロール終了。") | |
break | |
if new_height == last_height: | |
stuck_count += 1 | |
print(f" 検索リストスクロール高さ変化なし ({stuck_count}回目)。再試行...") | |
interruptible_sleep(SCROLL_PAUSE_TIME * 1.5) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック | |
new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
if new_height == last_height and stuck_count >= 3: | |
print(" 高さ変化なしが続いたため、検索リストスクロール終了と判断。") | |
break | |
else: | |
stuck_count = 0 | |
last_height = new_height | |
except Exception as e_scroll: | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロールエラー処理中に中断リクエスト") # エラー処理中もチェック | |
print(f"★★★★★ 検索リストスクロール中にエラー ★★★★★\n{type(e_scroll).__name__}: {e_scroll}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
print(" スクロールエラー発生。可能な範囲で続行します。") | |
scroll_attempts += 1 | |
if scroll_attempts >= MAX_SCROLL_ATTEMPTS: | |
print(f" 検索リスト最大スクロール回数 ({MAX_SCROLL_ATTEMPTS}) 到達。") | |
# 3. リンク抽出 | |
if interrupt_event.is_set(): raise InterruptedError("リンク抽出前に中断リクエスト") # 中断チェック | |
print(" 検索結果リストからリンクを抽出...") | |
unique_place_links = set() | |
result_card_selector = '.hfpxzc' | |
try: | |
list_container_updated = WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) | |
) | |
result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
print(f" '{result_card_selector}' 要素を {len(result_cards)} 件発見。") | |
if not result_cards: | |
print(f" 警告: '{result_card_selector}' が見つかりません。代替セレクタ 'a.hfpxzc' で試行...") | |
result_card_selector = 'a.hfpxzc' | |
result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
print(f" 代替セレクタで {len(result_cards)} 件発見。") | |
if not result_cards: | |
print(f" 警告: 代替セレクタ 'a.Nv2PK' で試行...") | |
result_card_selector = 'a.Nv2PK' | |
result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
print(f" 代替セレクタで {len(result_cards)} 件発見。") | |
link_extraction_errors = 0 | |
for card_idx, card in enumerate(result_cards): | |
if interrupt_event.is_set(): raise InterruptedError("リンク抽出ループ中に中断リクエスト") # 中断チェック | |
try: | |
link_element = None | |
if card.tag_name == 'a': link_element = card | |
else: | |
try: link_element = card.find_element(By.TAG_NAME, 'a') | |
except NoSuchElementException: continue | |
if link_element: | |
href = link_element.get_attribute('href') | |
if href and "/maps/place/" in href and not href.startswith("javascript:"): | |
absolute_href = urljoin(base_url, href) | |
unique_place_links.add(absolute_href) | |
except StaleElementReferenceException: | |
link_extraction_errors += 1 | |
continue | |
except Exception as e_extract_link: | |
print(f"★★★★★ カード {card_idx+1} からのリンク抽出エラー ★★★★★\n{type(e_extract_link).__name__}: {e_extract_link}") | |
link_extraction_errors += 1 | |
if link_extraction_errors > 0: | |
print(f" リンク抽出中に {link_extraction_errors} 件のエラーが発生しました。") | |
print(f" 抽出したユニークリンク数: {len(unique_place_links)}") | |
except Exception as e_find_links: | |
print(f"★★★★★ リンク抽出プロセス全体でエラー ★★★★★\n使用したセレクタ: '{result_card_selector}'\n{type(e_find_links).__name__}: {e_find_links}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': f'Error (Link Extraction Fail)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Link Extraction Fail'}) | |
return results_list | |
if not unique_place_links: | |
print(" 有効な詳細ページリンクが見つかりませんでした。このクエリの結果はありません。") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': 'No Results Found', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Success: No Results'}) | |
return results_list | |
# 4. 各リンクの詳細ページを処理 | |
print(f" {len(unique_place_links)} 件の詳細情報を取得...") | |
link_list = sorted(list(unique_place_links)) | |
processed_urls = set() | |
for i, place_url in enumerate(link_list, 1): | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ処理ループ開始前に中断リクエスト") # 中断チェック | |
if place_url in processed_urls: continue | |
processed_urls.add(place_url) | |
print(f"\n --- 詳細取得 [Query:{query_index}, Result:{i}/{len(link_list)}] ---") | |
result_details = {'query_index': query_index, 'original_query': query, 'result_rank': i, 'place_url': place_url, 'html_filename': 'N/A', 'name': 'N/A', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Pending', 'extraction_error': None} | |
try: | |
print(f" 詳細ページに遷移: {place_url}") | |
driver.get(place_url) | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ読み込み後に中断リクエスト") | |
WebDriverWait(driver, WAIT_TIME_DETAIL).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1')) | |
) | |
interruptible_sleep(WAIT_TIME_BASE * 0.2) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ待機後に中断リクエスト") | |
# --- 口コミタブをクリック --- | |
review_tab_text = "クチコミ" | |
review_tab_xpath = f"//button[@role='tab'][contains(., '{review_tab_text}') or contains(@aria-label, '{review_tab_text}')]" | |
review_tab_clicked = False | |
review_scroll_element = None | |
try: | |
print(f" {review_tab_text}タブ クリック試行...") | |
review_tab = WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable((By.XPATH, review_tab_xpath)) | |
) | |
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", review_tab) | |
interruptible_sleep(0.3) | |
if interrupt_event.is_set(): raise InterruptedError("口コミタブクリック前に中断リクエスト") | |
driver.execute_script("arguments[0].click();", review_tab) | |
review_tab_clicked = True | |
print(f" {review_tab_text}タブをクリックしました。口コミコンテナ表示待機...") | |
review_container_selector = 'div.GHT2ce.NsCY4' | |
first_review_card_selector = f'{review_container_selector} div.jftiEf:first-of-type, {review_container_selector} div.MyEned:first-of-type' | |
review_scroll_element = WebDriverWait(driver, WAIT_TIME_DETAIL).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, review_container_selector)) | |
) | |
WebDriverWait(driver, 5).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, first_review_card_selector)) | |
) | |
print(f" 口コミコンテナ表示確認、スクロール要素取得。") | |
interruptible_sleep(WAIT_TIME_BASE * 0.5) | |
if interrupt_event.is_set(): raise InterruptedError("口コミコンテナ待機後に中断リクエスト") | |
except TimeoutException: print(f" 警告: {review_tab_text}タブまたは口コミコンテナの表示タイムアウト。") | |
except ElementClickInterceptedException: print(f" 警告: {review_tab_text}タブのクリックが遮られました。") | |
except NoSuchElementException: print(f" 警告: {review_tab_text}タブが見つかりません。") | |
except Exception as e_click_review: print(f"★★★★★ {review_tab_text}タブ処理中に予期せぬエラー ★★★★★\n{type(e_click_review).__name__}: {e_click_review}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
# --- 口コミエリアのスクロール処理 --- | |
if review_scroll_element: | |
print(" 口コミエリアをスクロールして全件表示試行...") | |
review_last_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) | |
review_scroll_attempts = 0 | |
review_stuck_count = 0 | |
while review_scroll_attempts < MAX_SCROLL_ATTEMPTS_REVIEW: | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 中断チェック | |
try: | |
driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', review_scroll_element) | |
interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック | |
review_new_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) | |
if review_new_height == review_last_height: | |
review_stuck_count += 1 | |
if review_stuck_count >= REVIEW_SCROLL_STUCK_LIMIT: | |
print(f" 口コミスクロール高さが{REVIEW_SCROLL_STUCK_LIMIT}回変化なし。スクロール終了と判断。") | |
break | |
else: | |
interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW * 2) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック | |
else: | |
review_stuck_count = 0 | |
review_last_height = review_new_height | |
except Exception as e_review_scroll: | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロールエラー処理中に中断リクエスト") | |
print(f"★★★★★ 口コミスクロール中にエラー ★★★★★\n{type(e_review_scroll).__name__}: {e_review_scroll}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
print(" 口コミスクロールエラー発生。可能な範囲で続行します。") | |
break | |
review_scroll_attempts += 1 | |
if review_scroll_attempts >= MAX_SCROLL_ATTEMPTS_REVIEW: | |
print(f" 最大口コミスクロール回数 ({MAX_SCROLL_ATTEMPTS_REVIEW}) 到達。") | |
print(" 口コミエリアのスクロール完了。") | |
elif review_tab_clicked: print(" 警告: 口コミスクロール要素が見つからなかったため、口コミスクロールをスキップします。") | |
# --- 「もっと見る」ボタンをクリック --- | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック前に中断リクエスト") | |
if review_tab_clicked and review_scroll_element: | |
print(" 「もっと見る」ボタンを検索してクリック試行...") | |
more_buttons_xpath = "//button[contains(text(), 'もっと見る')]" | |
clicked_count = 0 | |
click_attempts = 0 | |
max_click_attempts = 3 | |
while click_attempts < max_click_attempts: | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」ループ中に中断リクエスト") # 中断チェック | |
buttons_found_this_round = 0 | |
try: | |
more_buttons = driver.find_elements(By.XPATH, more_buttons_xpath) | |
if not more_buttons: | |
if click_attempts == 0: print(" 「もっと見る」ボタンが見つかりませんでした。") | |
else: print(f" 追加の「もっと見る」ボタンは見つかりませんでした (試行 {click_attempts+1}/{max_click_attempts})。") | |
break | |
print(f" 「もっと見る」ボタンを {len(more_buttons)} 個発見 (試行 {click_attempts+1}/{max_click_attempts})。クリック開始...") | |
for btn_idx, button in enumerate(more_buttons): | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") # 中断チェック | |
try: | |
if button.is_displayed() and button.is_enabled(): | |
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", button) | |
interruptible_sleep(0.2) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") | |
driver.execute_script("arguments[0].click();", button) | |
clicked_count += 1 | |
buttons_found_this_round += 1 | |
interruptible_sleep(0.3) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") | |
except ElementClickInterceptedException: print(f" ボタン {btn_idx+1} のクリックが遮られました。スキップします。") | |
except StaleElementReferenceException: print(f" ボタン {btn_idx+1} が古くなりました。スキップします。") | |
except Exception as e_click_more: print(f" ボタン {btn_idx+1} のクリック中にエラー: {e_click_more}") | |
print(f" 今回の試行で {buttons_found_this_round} 個の「もっと見る」ボタンをクリックしました。") | |
if buttons_found_this_round == 0: | |
print(" これ以上クリックできる「もっと見る」ボタンはありませんでした。") | |
break | |
except Exception as e_find_more: | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」検索エラー処理中に中断リクエスト") | |
print(f"★★★★★ 「もっと見る」ボタン検索中にエラー ★★★★★\n{type(e_find_more).__name__}: {e_find_more}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
break | |
click_attempts += 1 | |
if click_attempts < max_click_attempts: | |
interruptible_sleep(1.0) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」試行間待機中に中断リクエスト") | |
if clicked_count > 0: print(f" 合計 {clicked_count} 個の「もっと見る」ボタンをクリックしました。") | |
else: print(" クリックされた「もっと見る」ボタンはありませんでした。") | |
interruptible_sleep(WAIT_TIME_BASE * 0.5) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック後に中断リクエスト") | |
# --- HTML取得と保存 --- | |
print(" ページのHTMLを取得・保存中...") | |
detail_html_content = "" | |
try: | |
if interrupt_event.is_set(): raise InterruptedError("HTML取得前に中断リクエスト") | |
detail_html_content = driver.page_source | |
temp_name = 'N/A' | |
try: temp_name = driver.find_element(By.TAG_NAME, 'h1').text | |
except: pass | |
safe_place_name_part = re.sub(r'[\\/*?:"<>|]', '_', temp_name)[:20].strip() or "no_name" | |
tab_suffix = "_reviews_expanded" if review_tab_clicked else "_overview" | |
detail_html_fname = f"Q{query_index:03d}_R{i:03d}_{safe_place_name_part}_{safe_query_part}_detail{tab_suffix}.html" | |
detail_html_path = os.path.join(output_dir, detail_html_fname) | |
with open(detail_html_path, 'w', encoding='utf-8') as f: | |
f.write(detail_html_content) | |
result_details['html_filename'] = detail_html_fname | |
print(f" HTMLを保存しました: {detail_html_fname}") | |
except Exception as e_save_html: | |
print(f" HTML取得/保存エラー: {e_save_html}") | |
result_details['html_filename'] = 'Error Saving HTML' | |
# --- HTML解析 --- | |
if detail_html_content: | |
print(" HTMLを解析して情報を抽出中...") | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析前に中断リクエスト") | |
extracted_info = extract_details_and_reviews_from_html(detail_html_content) | |
result_details.update(extracted_info) | |
# 抽出関数内で中断された場合、ステータスが'Interrupted'になっているはず | |
if result_details.get('status') != 'Interrupted': | |
if result_details.get('extraction_error'): | |
result_details['status'] = f"Warning: HTML Extraction Error" | |
else: | |
result_details['status'] = 'Success' | |
print(" HTML解析完了。") | |
else: | |
print(" エラー: HTMLコンテンツが空のため、情報抽出をスキップします。") | |
result_details['status'] = 'Error: Empty HTML Content' | |
except TimeoutException as e_timeout_detail: | |
print(f"★★★★★ 詳細ページ読み込みタイムアウト ★★★★★\nURL: {place_url}\n{e_timeout_detail}") | |
print("--- HTML Snapshot (Timeout) ---") | |
try: print(driver.page_source[:1000]) | |
except: print(" ページソース取得失敗") | |
print("--- End Snapshot ---") | |
result_details['status'] = f'Error: Detail Page Timeout'; result_details['name'] = f"Error (Timeout R:{i})" | |
except NoSuchElementException as e_nse: | |
print(f"★★★★★ 詳細ページで必須要素(h1など)が見つかりません ★★★★★\nURL: {place_url}\n{e_nse}") | |
print("--- HTML Snapshot (NSE) ---") | |
try: print(driver.page_source[:1000]) | |
except: print(" ページソース取得失敗") | |
print("--- End Snapshot ---") | |
result_details['status'] = f'Error: Detail Page Missing Element (e.g., h1)'; result_details['name'] = f"Error (ElementNotFound R:{i})" | |
except Exception as e_detail: | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ例外処理中に中断リクエスト") # 例外処理中もチェック | |
print(f"★★★★★ 詳細ページ処理中に予期せぬエラー ★★★★★\nURL: {place_url}\n{type(e_detail).__name__}: {e_detail}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
result_details['status'] = f'Error: Detail Page Exception - {type(e_detail).__name__}'; result_details['name'] = f"Error (Exception R:{i})" | |
finally: | |
# 中断された場合、ステータスを上書き | |
if interrupt_event.is_set() and result_details.get('status') != 'Interrupted': | |
result_details['status'] = 'Interrupted' | |
results_list.append(result_details) | |
except InterruptedError as e_interrupt: # クエリ処理全体で中断をキャッチ | |
print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理中に中断リクエスト: {e_interrupt} ★★★★★") | |
# 中断されたことを示す結果を追加 | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 'N/A', 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Interrupted Query {query_index}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Interrupted'}) | |
# ★重要★ 中断例外を再度発生させ、run_scraping関数に中断を伝える | |
raise e_interrupt | |
except Exception as e_main_query: | |
print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理全体でエラー ★★★★★\n{type(e_main_query).__name__}: {e_main_query}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Error (Overall Query {query_index})', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Query Level Exception - {type(e_main_query).__name__}'}) | |
finally: | |
status_msg = "中断" if interrupt_event.is_set() else "完了" | |
print(f"--- クエリ処理{status_msg} [Index:{query_index}] - {len(results_list)} 件の結果 ---") | |
return results_list | |
# --- 中断リクエスト用関数 --- | |
def request_interrupt(): | |
"""中断フラグをセットする""" | |
if not interrupt_event.is_set(): | |
print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
print("!!! 中断リクエストを受け付けました。 !!!") | |
print("!!! 現在の処理が完了次第、停止します... !!!") | |
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") | |
interrupt_event.set() | |
else: | |
print("\n--- 中断は既にリクエストされています ---") | |
# GradioのTextboxに即時反映させるため、ダミーの値を返す | |
# (clickイベントのoutputsにTextboxを指定する必要があるため) | |
# 実際にはログは run_scraping 内で更新される | |
return "[中断リクエスト受信]" | |
# --- Gradio Processing Function (中断処理対応) --- | |
def run_scraping(input_csv_file, output_dir_name, output_csv_name, csv_encoding, | |
wait_time_base, wait_time_detail, wait_time_search, headless_mode, progress=gr.Progress()): | |
"""Gradioインターフェースから呼び出されるメイン処理関数(中断機能付き)""" | |
log_stream = io.StringIO() # ログ出力用 | |
start_time_total = time.time() # 全体処理時間計測開始 | |
driver = None # WebDriverオブジェクト初期化 | |
processed_query_count = 0 # 処理済みクエリ数 | |
total_results_count = 0 # CSV書き込み総行数 | |
total_queries = 0 # 総クエリ数 | |
output_csv_path = None # 出力CSVファイルパス | |
interrupted_flag = False # 処理が中断されたかを示すフラグ | |
# --- 中断フラグをリセット --- | |
interrupt_event.clear() | |
print("中断フラグをリセットしました。", file=log_stream) | |
# 標準出力と標準エラー出力をログストリームにリダイレクト | |
with contextlib.redirect_stdout(log_stream), contextlib.redirect_stderr(log_stream): | |
try: | |
print("=== 処理開始 ===") | |
print(f"開始時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
# 入力ファイルチェック | |
if input_csv_file is None: | |
print("エラー: CSVファイルが選択されていません。処理を中断します。") | |
yield log_stream.getvalue(), None # ログと空の結果を返す | |
return | |
yield log_stream.getvalue(), None # 初期ログをUIに反映 | |
# パラメータ設定 | |
SEARCH_QUERIES_CSV_PATH = input_csv_file.name | |
OUTPUT_DIR = output_dir_name.strip() or "html_reviews_expanded" | |
OUTPUT_CSV_FILENAME = output_csv_name.strip() or "結果_reviews_expanded.csv" | |
CSV_ENCODING = csv_encoding | |
try: | |
wait_config = { | |
'base': max(1.0, float(wait_time_base)), | |
'detail': max(10.0, float(wait_time_detail)), | |
'search': max(5.0, float(wait_time_search)) | |
} | |
except ValueError: | |
print("警告: 待機時間に無効な値が入力されました。デフォルト値を使用します。") | |
wait_config = {'base': 4.0, 'detail': 20.0, 'search': 15.0} | |
print(f"待機時間設定: 基本={wait_config['base']}秒, 詳細/口コミ={wait_config['detail']}秒, 検索={wait_config['search']}秒") | |
yield log_stream.getvalue(), None | |
# 出力ディレクトリ設定と作成 | |
if not os.path.isabs(OUTPUT_DIR): | |
OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR) | |
output_csv_path = os.path.join(OUTPUT_DIR, OUTPUT_CSV_FILENAME) | |
print(f"HTML出力先ディレクトリ: {OUTPUT_DIR}") | |
print(f"CSV出力先ファイル: {output_csv_path}") | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
yield log_stream.getvalue(), None | |
# CSVからクエリ読み込み (中断チェックあり) | |
queries = load_queries(SEARCH_QUERIES_CSV_PATH) | |
yield log_stream.getvalue(), None | |
if interrupt_event.is_set(): # 読み込み中に中断されたかチェック | |
print("CSV読み込み中に中断されたため、処理を終了します。") | |
interrupted_flag = True | |
raise InterruptedError("CSV loading interrupted") # 処理を中断フローへ | |
if not queries: | |
print("エラー: CSVから処理可能なクエリが見つかりませんでした。処理を終了します。") | |
yield log_stream.getvalue(), None | |
return | |
total_queries = len(queries) | |
print(f"{total_queries} 件のクエリを処理します。") | |
yield log_stream.getvalue(), None | |
# --- 中断チェック --- | |
if interrupt_event.is_set(): raise InterruptedError("WebDriver初期化前に中断リクエスト") | |
# WebDriver初期化 | |
progress(0, desc="WebDriver初期化中...") | |
print("\nWebDriver初期化中...") | |
yield log_stream.getvalue(), None | |
options = Options() | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
options.add_argument('--lang=ja-JP') | |
options.add_argument("--window-size=1920,1080") | |
options.add_argument('--disable-extensions') | |
options.add_argument('--disable-blink-features=AutomationControlled') | |
options.add_argument('--disable-gpu') | |
options.add_experimental_option('excludeSwitches', ['enable-automation']) | |
options.add_experimental_option('useAutomationExtension', False) | |
options.add_experimental_option("prefs", { | |
"credentials_enable_service": False, | |
"profile.password_manager_enabled": False | |
}) | |
if headless_mode: | |
print(" ヘッドレスモードで実行します。") | |
options.add_argument('--headless=new') | |
else: | |
print(" 通常モード (非ヘッドレス) で実行します。") | |
try: | |
if IN_COLAB and gs: | |
print(" Colab環境でgoogle_colab_seleniumを使用します。") | |
driver = gs.Chrome(options=options) | |
elif not IN_COLAB and ChromeService and ChromeDriverManager: | |
try: | |
print(" webdriver-managerを使用してChromeDriverパスを解決します...") | |
service = ChromeService(ChromeDriverManager().install()) | |
driver = webdriver.Chrome(service=service, options=options) | |
print(" ChromeDriver (webdriver-manager) 起動成功。") | |
except Exception as e_wdm: | |
print(f" webdriver-managerでの初期化エラー: {e_wdm}") | |
print(" PATH上のChromeDriverで試行します...") | |
driver = webdriver.Chrome(options=options) | |
print(" ChromeDriver (PATH) 起動成功。") | |
elif not IN_COLAB: | |
print(" PATH上のChromeDriverを使用します...") | |
driver = webdriver.Chrome(options=options) | |
print(" ChromeDriver (PATH) 起動成功。") | |
else: | |
raise Exception("WebDriverを初期化できませんでした。適切なWebDriver設定が見つかりません。") | |
driver.implicitly_wait(3) | |
print("WebDriver初期化完了。") | |
except Exception as e_wd_init: | |
print(f"★★★★★ WebDriver初期化失敗 ★★★★★") | |
print(f"エラータイプ: {type(e_wd_init).__name__}") | |
print(f"エラーメッセージ: {e_wd_init}") | |
print("--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") | |
print("ヒント: ChromeDriverのバージョンとChromeブラウザのバージョンが一致しているか確認してください。") | |
if not IN_COLAB: print(" `webdriver-manager`がインストールされていない場合は `pip install webdriver-manager` を試してください。") | |
yield log_stream.getvalue(), None | |
return | |
yield log_stream.getvalue(), None | |
# --- 中断チェック --- | |
if interrupt_event.is_set(): raise InterruptedError("CSV処理開始前に中断リクエスト") | |
csv_header = ['QueryIndex', 'OriginalQuery', 'ResultRank', 'Status', 'ExtractedName', | |
'ExtractedWebsite', 'ExtractedPhone', 'ExtractedAddress', 'Reviews', | |
'ExtractionError', 'PlaceURL', 'DetailHTMLFilename'] | |
file_exists = os.path.exists(output_csv_path) | |
file_mode = 'a' if file_exists and os.path.getsize(output_csv_path) > 0 else 'w' | |
print(f"CSVファイルを '{file_mode}' モードで開きます (エンコーディング: {CSV_ENCODING})。") | |
yield log_stream.getvalue(), None | |
try: | |
with open(output_csv_path, file_mode, newline='', encoding=CSV_ENCODING, errors='replace') as csv_file: | |
writer = csv.writer(csv_file) | |
if file_mode == 'w': | |
print(" 新規CSVファイルのためヘッダー行を書き込みます。") | |
writer.writerow(csv_header) | |
csv_file.flush() | |
elif file_exists: | |
print(f" 既存ファイル '{os.path.basename(output_csv_path)}' に追記します。") | |
for i, query in enumerate(queries, 1): | |
# --- ループ開始時に中断チェック --- | |
if interrupt_event.is_set(): | |
print(f"\n===== クエリ {i}/{total_queries} の処理開始前に中断リクエストを検出 =====") | |
interrupted_flag = True | |
break # ループを抜ける | |
progress(i / total_queries, desc=f"クエリ {i}/{total_queries} 処理中: {query[:30]}...") | |
start_time_query = time.time() | |
print(f"\n===== クエリ {i}/{total_queries} 開始: '{query}' =====") | |
yield log_stream.getvalue(), None | |
results = [] | |
try: | |
# --- 単一クエリのスクレイピング処理実行 (中断例外をキャッチ) --- | |
results = process_single_query_full_list(driver, query, i, OUTPUT_DIR, wait_config) | |
except InterruptedError as e_interrupt_query: | |
print(f"クエリ {i} の処理が中断されました: {e_interrupt_query}") | |
interrupted_flag = True # メインループに中断を伝える | |
# results には中断時点までの結果が入っている可能性がある | |
if not any(r['status'] == 'Interrupted' for r in results): | |
# results に中断を示すものがなければ追加 | |
results.append({'query_index': i, 'original_query': query, 'result_rank': 'N/A', 'status': 'Interrupted', 'name': f'Interrupted Query {i}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'extraction_error': str(e_interrupt_query), 'place_url': 'N/A', 'html_filename': 'N/A'}) | |
yield log_stream.getvalue(), None | |
# --- 取得結果をCSVに書き込み --- | |
written_count_query = 0 | |
print(f" クエリ {i} の結果をCSVに書き込み中...") | |
for result_data in results: | |
try: | |
reviews_list = result_data.get('reviews', []) | |
formatted_reviews = "" | |
if isinstance(reviews_list, list) and reviews_list: | |
review_texts = [] | |
for idx, review_item in enumerate(reviews_list): | |
if isinstance(review_item, dict): | |
r_text = str(review_item.get('text', '')).replace('\n', ' ').replace('\r', '') | |
review_texts.append(f"[{idx+1}] 投稿者: {review_item.get('reviewer', 'N/A')} | 評価: {review_item.get('rating', 'N/A')} | 本文: {r_text}") | |
elif isinstance(review_item, str): | |
review_texts.append(f"[{idx+1}] {review_item.replace('n', ' ').replace('r', '')}") | |
formatted_reviews = "\n\n".join(review_texts) | |
elif isinstance(reviews_list, str): | |
formatted_reviews = reviews_list.replace('\n', ' ').replace('\r', '') | |
extraction_error_msg = result_data.get('extraction_error', '') | |
if extraction_error_msg and len(extraction_error_msg) > 500: | |
extraction_error_msg = extraction_error_msg[:250] + "..." + extraction_error_msg[-250:] | |
row_data = [ | |
result_data.get('query_index', i), result_data.get('original_query', query), | |
result_data.get('result_rank', 'N/A'), result_data.get('status', 'Unknown'), | |
result_data.get('name', 'N/A'), result_data.get('url', ''), | |
result_data.get('phone', 'N/A'), result_data.get('address', 'N/A'), | |
formatted_reviews, extraction_error_msg, | |
result_data.get('place_url', 'N/A'), result_data.get('html_filename', 'N/A') | |
] | |
writer.writerow(row_data) | |
written_count_query += 1 | |
except Exception as e_write: | |
print(f"★★★★★ CSV書き込み中にエラーが発生しました (行スキップ) ★★★★★") | |
print(f"エラーデータ (一部): {str(result_data)[:200]}...") | |
print(f"エラータイプ: {type(e_write).__name__}: {e_write}") | |
csv_file.flush() | |
total_results_count += written_count_query | |
processed_query_count += 1 | |
end_time_query = time.time() | |
query_status_msg = "中断" if result_data.get('status') == 'Interrupted' else "完了" | |
print(f"===== クエリ {i}/{total_queries} {query_status_msg} - {written_count_query}件書き込み, 所要時間: {end_time_query - start_time_query:.2f} 秒 =====") | |
yield log_stream.getvalue(), None | |
# 中断フラグが立っていたら、ループを終了 | |
if interrupted_flag: | |
print("\n中断リクエストに従い、次のクエリへ進まず処理を終了します。") | |
break | |
# --- クエリ間の待機 (中断可能) --- | |
if i < total_queries: | |
sleep_duration = wait_config['base'] * 1.5 + (hash(query + str(i)) % (wait_config['base'] * 1.5)) | |
sleep_duration = max(wait_config['base'] * 0.8, min(sleep_duration, wait_config['base'] * 4.0)) | |
print(f"次のクエリまで {sleep_duration:.2f} 秒待機します...") | |
yield log_stream.getvalue(), None | |
interruptible_sleep(sleep_duration) | |
# 待機後にも中断チェック | |
if interrupt_event.is_set(): | |
print("待機中に中断リクエストを検出。処理を終了します。") | |
interrupted_flag = True | |
break # ループを抜ける | |
else: | |
print("\n全クエリの処理が完了しました。") | |
except IOError as e_io: | |
print(f"★★★★★ CSVファイル '{output_csv_path}' のオープン/書き込み中にIOエラー ★★★★★") | |
print(f"エラータイプ: {type(e_io).__name__}: {e_io}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") | |
print("ファイルが他のプログラムで開かれていないか、書き込み権限があるか確認してください。") | |
output_csv_path = None # 結果ファイルパスを無効化 | |
except Exception as e_csv_loop: | |
print(f"★★★★★ CSV処理ループ中に予期せぬエラー ★★★★★") | |
print(f"エラータイプ: {type(e_csv_loop).__name__}: {e_csv_loop}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") | |
except InterruptedError: # run_scraping全体で中断をキャッチ | |
print("\n★★★★★ 処理がユーザーによって中断されました ★★★★★") | |
interrupted_flag = True # 中断フラグを立てる | |
# ここで特別な処理は不要、finallyブロックで終了処理が行われる | |
except Exception as e_main: | |
print(f"\n★★★★★ メイン処理 (run_scraping) 中に予期せぬエラーが発生しました ★★★★★") | |
print(f"エラータイプ: {type(e_main).__name__}: {e_main}") | |
print("\n--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") | |
# エラー発生時も、可能な限りログと途中までのCSVを返す | |
finally: | |
# --- 終了処理 --- | |
if driver: | |
print("\nWebDriver終了処理中...") | |
try: | |
driver.quit() | |
print("WebDriver正常終了。") | |
except Exception as e_quit: | |
print(f"★★★★★ WebDriver終了時にエラー ★★★★★") | |
print(f"エラータイプ: {type(e_quit).__name__}: {e_quit}") | |
end_time_total = time.time() | |
total_duration_seconds = end_time_total - start_time_total | |
final_status = "中断" if interrupted_flag else "完了" | |
print(f"\n=== 全処理終了 ({final_status}) ===") | |
print(f"終了時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
print(f"処理{final_status}クエリ数: {processed_query_count}/{total_queries if total_queries > 0 else 'N/A'} 件") | |
print(f"CSV書き込み総行数: {total_results_count} 件") | |
print(f"総処理時間: {total_duration_seconds:.2f} 秒 ({total_duration_seconds/60:.2f} 分)") | |
if interrupted_flag: | |
print("*** 処理は途中で中断されました ***") | |
final_log = log_stream.getvalue() | |
# プログレスバーを完了状態にする | |
progress(1.0, desc=f"処理{final_status}") | |
if output_csv_path and os.path.exists(output_csv_path) and os.path.getsize(output_csv_path) > 0: | |
print(f"結果CSVファイル: {output_csv_path}") | |
yield final_log, gr.File(value=output_csv_path, label=f"結果CSVダウンロード ({final_status})") | |
elif output_csv_path: | |
print(f"警告: 結果CSVファイル '{output_csv_path}' は空または存在しません。") | |
yield final_log, None | |
else: | |
print("結果CSVファイルは生成されませんでした。") | |
yield final_log, None | |
# --- Gradio UI 定義 (中断ボタン追加) --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Google Maps スクレイピング (口コミ全件表示試行・中断機能付き)") | |
gr.Markdown( | |
""" | |
CSVクエリで検索し、詳細ページで「クチコミ」タブをクリック後、口コミエリアを**最後までスクロール**し、 | |
さらに**「もっと見る」ボタンを全てクリック**して全件表示を試みます。 | |
その後、基本情報と口コミ情報(`span.wiI7pd`優先)を抽出し、CSVに出力します。 | |
**「処理中断」ボタン**で進行中の処理を安全に停止できます(現在のクエリ完了後)。 | |
**処理フロー:** | |
1. クエリ検索 → リストスクロール → リンク抽出。 | |
2. 詳細ページ遷移 → **「クチコミ」タブクリック** → 口コミコンテナ待機。 | |
3. **口コミエリアを最後までスクロール**。 | |
4. **「もっと見る」ボタンを全てクリック** (複数回試行)。 | |
5. HTML取得 → **bs4**で解析 (基本情報: `.aIFcqe`優先, 口コミ本文: `span.wiI7pd`優先)。 | |
6. 結果をCSVに出力(HTMLも保存)。 | |
7. 各ステップおよび待機中に**中断リクエストをチェック**。 | |
**注意:** ネットワーク状況やサイト構造の変更により時間がかかる、またはエラーが発生する場合があります。 | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Markdown("### ① 入力ファイルと出力設定") | |
input_csv_file = gr.File(label="検索クエリCSVファイル (1列目のみ使用)", file_types=[".csv"]) | |
output_dir_name = gr.Textbox(label="HTML保存先ディレクトリ名", value="html_reviews_expanded") | |
output_csv_name = gr.Textbox(label="出力CSVファイル名", value="結果_reviews_expanded.csv") | |
csv_encoding = gr.Dropdown(label="出力CSVエンコーディング", choices=['utf-8-sig', 'cp932'], value='utf-8-sig') | |
headless_mode = gr.Checkbox(label="ヘッドレスモードで実行 (エラー発生時はOFF推奨)", value=True) | |
with gr.Column(scale=1): | |
gr.Markdown("### ② 待機時間設定 (秒)") | |
wait_time_base = gr.Number(label="基本待機", minimum=1, maximum=20, step=0.5, value=4) | |
wait_time_detail = gr.Number(label="詳細/口コミ最大待機", minimum=10, maximum=60, step=1, value=25) | |
wait_time_search = gr.Number(label="検索リスト最大待機", minimum=5, maximum=60, step=1, value=15) | |
with gr.Row(): | |
start_button = gr.Button("処理開始", variant="primary", size="lg", scale=3) | |
# --- 中断ボタンを追加 --- | |
stop_button = gr.Button("処理中断", variant="stop", size="lg", scale=1) | |
gr.Markdown("### ③ 処理ステータスとエラーログ") | |
# プログレスバーを追加 | |
progress_bar = gr.Progress(track_tqdm=True) | |
status_textbox = gr.Textbox(label="ログ", lines=25, interactive=False, autoscroll=True, max_lines=2000) | |
gr.Markdown("### ④ 結果ダウンロード") | |
output_csv_download = gr.File(label="結果CSVダウンロード", interactive=False) | |
# ボタンクリック時の動作設定 | |
# 処理開始ボタン | |
start_button.click( | |
fn=run_scraping, | |
inputs=[input_csv_file, output_dir_name, output_csv_name, csv_encoding, | |
wait_time_base, wait_time_detail, wait_time_search, headless_mode], | |
outputs=[status_textbox, output_csv_download], | |
# progress 引数を渡す | |
show_progress='full' # Gradio 組み込みのプログレス表示を使う場合 | |
) | |
# 中断ボタン | |
# stop_button.click(fn=request_interrupt, inputs=None, outputs=None, cancels=[start_event]) # Gradioのcancel機能を使う場合 | |
# cancels 引数を使うには、start_button.click の返り値を変数に受ける必要があるが、 | |
# 複数出力がある場合はタプルになるなど複雑化する。 | |
# ここでは、Python側でフラグを立ててチェックする方式を採用。 | |
# request_interrupt の戻り値を status_textbox に一時的に表示する例 | |
stop_button.click(fn=request_interrupt, inputs=None, outputs=status_textbox) | |
# --- UI起動 --- | |
print("Gradio UIを起動します...") | |
# queue()で複数ユーザー対応、share=Trueで共有リンク生成 | |
demo.queue().launch(share=True, debug=False) |