import gradio as gr import os import csv import json import time from bs4 import BeautifulSoup from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, ElementClickInterceptedException import re from urllib.parse import urlparse, urljoin import traceback import io import contextlib from datetime import datetime import threading import pandas as pd import tempfile # --- WebDriverの選択 --- IN_COLAB = 'google.colab' in str(globals().get('get_ipython', '')) if IN_COLAB: print("Google Colab環境を検出。google_colab_selenium を使用します。") try: import google_colab_selenium as gs except ImportError: print("google_colab_seleniumが見つかりません。!pip install google-colab-selenium を実行してください。"); gs = None else: print("ローカル環境を検出。通常の selenium webdriver を使用します。") from selenium import webdriver gs = None try: from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager except ImportError: print("webdriver-manager が見つかりません。 `pip install webdriver-manager` を実行してください。") ChromeService = None ChromeDriverManager = None # --- 中断フラグ --- interrupt_event = threading.Event() # --- Helper Functions (From Script 1) --- def find_prefixed_data_string(data_structure): """データ構造内から ")]}'\n" で始まる文字列を見つける(再帰的検索)""" if isinstance(data_structure, str) and data_structure.startswith(")]}'\n"): return data_structure elif isinstance(data_structure, list): for item in data_structure: if interrupt_event.is_set(): return None # 中断チェック found = find_prefixed_data_string(item) if found: return found elif isinstance(data_structure, dict): for value in data_structure.values(): if interrupt_event.is_set(): return None # 中断チェック found = find_prefixed_data_string(value) if found: return found return None def find_details_data_by_id_or_heuristic(data_list, place_id=None): """ JSONデータリストから詳細情報を含む可能性のあるリストを特定する。 place_idがあればそれを優先し、なければヒューリスティック(住所形式など)で探す。 """ if not isinstance(data_list, list): return None if interrupt_event.is_set(): return None # 中断チェック potential_candidates = [] for item in data_list: if interrupt_event.is_set(): return None # 中断チェック # 詳細データは通常、要素数が比較的多いリスト形式 if not isinstance(item, list) or len(item) < 30: continue is_candidate = False # place_id が指定されていれば、リスト内にそのIDが含まれるかチェック if place_id and place_id in str(item): is_candidate = True # place_id がない場合は、住所らしき情報が含まれるかヒューリスティックにチェック elif not place_id: has_address_like = any( isinstance(sub, str) and ("〒" in sub or any(k in sub for k in ["都", "道", "府", "県", "市", "区", "町", "村", "丁目", "番地", "号"]) or re.search(r'\d+-\d+-\d+', sub)) for sub in item ) if has_address_like: is_candidate = True if is_candidate: potential_candidates.append(item) if not potential_candidates: return None # 候補が1つならそれを返す if len(potential_candidates) == 1: return potential_candidates[0] # 候補が複数ある場合、スコアリングで最もそれらしいものを選ぶ best_candidate = None max_score = -1 for candidate in potential_candidates: if interrupt_event.is_set(): return None # 中断チェック score = len(candidate) # 要素数が多いほど詳細情報の可能性が高い try: # 特定のインデックスにリストが存在するか(構造的な特徴) if any(isinstance(candidate[idx], list) and candidate[idx] for idx in [7, 13, 178] if idx < len(candidate)): score += 50 # URLらしき文字列が含まれるか if 7 < len(candidate) and isinstance(candidate[7], list) and len(candidate[7]) > 0 and isinstance(candidate[7][0], str) and candidate[7][0].startswith('http'): score += 50 # 別の構造的な特徴 if 34 < len(candidate) and isinstance(candidate[34], list) and candidate[34]: score += 30 except Exception: # スコアリング中のエラーは無視 pass if score > max_score: max_score = score best_candidate = candidate return best_candidate def is_domain_like(text): """文字列がドメイン名らしい形式か簡易的に判定""" if not isinstance(text, str): return False text = text.strip().lower() common_tlds = ['.com', '.jp', '.co.jp', '.net', '.org', '.info', '.biz'] # URLスキーマ、パス、特殊文字、全角文字、IPアドレス形式、前後のドット、連続ドットは除外 if re.search(r'^(https?|ftp)://|[/\\?#\s\u3000-\uFFFF:;@!$%^*()=+]', text): return False if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', text): return False if text.startswith('.') or text.endswith('.') or '..' in text: return False # ドットを含み、一般的なTLDで終わるかチェック return '.' in text and any(text.endswith(tld) for tld in common_tlds) def safe_get(data, index, default=None): """ネストされたリストや辞書から安全に値を取得する""" if isinstance(index, int): try: return data[index] if isinstance(data, list) and index < len(data) else default except IndexError: return default elif isinstance(index, list): # インデックスのリストでネストされた要素を取得 current = data for idx in index: if interrupt_event.is_set(): return default # 中断チェック try: if isinstance(current, list) and isinstance(idx, int) and idx < len(current): current = current[idx] elif isinstance(current, dict) and idx in current: current = current[idx] else: return default # 途中でリスト/辞書でない、またはインデックス/キーが存在しない場合 except (IndexError, KeyError, TypeError): return default # その他の予期せぬエラー return current elif isinstance(index, str): # 文字列インデックスは辞書のキーとして扱う return data.get(index, default) if isinstance(data, dict) else default return default # --- 中断チェック付き時間待機関数 --- def interruptible_sleep(duration): """指定された時間待機するが、中断イベントが発生したら即座に終了する""" interrupt_event.wait(timeout=duration) # waitはタイムアウトするかイベントがセットされると戻る # 呼び出し元で interrupt_event.is_set() をチェックする必要がある # --- HTML抽出関数 (本文抽出を span.wiI7pd 優先に変更、中断チェック追加) --- def extract_details_and_reviews_from_html(html_content): """詳細HTMLから基本情報と口コミ情報を抽出 (本文は span.wiI7pd 優先、中断チェックあり)""" # この関数はスクレイピング処理中に呼び出される print(" [HTML Extractor - Details & Reviews (wiI7pd priority)] 開始") soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') details = {"name": "N/A", "url": "", "phone": "N/A", "address": "N/A", "links": {}, "reviews": [], "extraction_error": None} try: # --- 基本情報の抽出 --- if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") main_container_selector = '.aIFcqe' main_container = soup.select_one(main_container_selector) search_root = soup # デフォルトはページ全体 if main_container: # print(f" '{main_container_selector}' コンテナ発見。基本情報を抽出。") search_root = main_container # else: # print(f" 警告: '{main_container_selector}' コンテナが見つかりません。ページ全体から基本情報を抽出。") # 名前 (h1タグを探す) if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") name_tag = search_root.find('h1') if name_tag: details['name'] = name_tag.get_text(strip=True) elif details['name'] == 'N/A': # フォールバックでから取得 title_tag = soup.find('title') if title_tag and title_tag.string: title_text = title_tag.string.replace('- Google マップ', '').strip() if title_text.lower() != "google マップ": details["name"] = title_text # 電話、住所、ウェブサイトなどの情報を抽出 selectors_map = { "phone": ['button[data-item-id^="phone:tel:"]', 'div.Io6YTe', 'button[aria-label*="電話番号"]'], "address": ['button[data-item-id="address"]', 'div.rogA2c', 'button[aria-label*="住所"]'], "website": ['a[data-item-id="authority"][href^="http"]', 'button[data-item-id="authority"]', 'a[aria-label*="ウェブサイト"][href^="http"]'], "other_link": ['a.CsEnBe[href^="http"]'] # 公式サイト以外のリンク } for info_type, selectors in selectors_map.items(): if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") found_val = None for selector in selectors: if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") element = search_root.select_one(selector) # コンテナ内で見つからなければページ全体で再検索 if not element and search_root != soup: element = soup.select_one(selector) if element: data_item_id = element.get('data-item-id', '') aria_label = element.get('aria-label', '') element_text = element.get_text(strip=True) href = element.get('href') if info_type == "phone": phone_num = None if data_item_id.startswith('phone:tel:'): phone_num = data_item_id.split(':')[-1] elif "電話番号:" in aria_label: phone_num = re.search(r'([\d-]+)', aria_label.split("電話番号:")[-1]) elif element.name == 'div' and re.match(r'^[\d\s-]+$', element_text): phone_num = element_text # 電話番号形式の整形と検証 if isinstance(phone_num, str): phone_num = phone_num.strip() elif hasattr(phone_num, 'group'): phone_num = phone_num.group(1).strip() if phone_num and re.match(r'^[\d-]+$', phone_num.replace('ー','-')): found_val = phone_num.replace('ー','-') break # 電話番号が見つかったらループ脱出 elif info_type == "address": addr_text = None if data_item_id == 'address': addr_text = element_text elif "住所:" in aria_label: addr_text = aria_label.split("住所:")[-1].split('(新しいウィンドウで開きます)')[0].strip() elif element.name == 'div' and ("〒" in element_text or any(k in element_text for k in ["都","道","府","県","市","区","町","村"])): addr_text = element_text # 住所らしき文字列か簡易チェック if addr_text and len(addr_text) > 5: # ある程度の長さがあるか found_val = addr_text break # 住所が見つかったらループ脱出 elif info_type == "website" or info_type == "other_link": if href and href.startswith('http') and 'google.com' not in urlparse(href).netloc: # Google自身のリンクは除外 link_name = "N/A"; is_website = False # リンクの種類を判別 if data_item_id == 'authority' or "ウェブサイト" in aria_label: link_name = element_text if is_domain_like(element_text) else "ウェブサイト" is_website = True elif info_type == "other_link": link_name = f"リンク ({element_text})" if element_text else "外部リンク" elif is_domain_like(element_text): # ドメイン名らしきテキストの場合 link_name = element_text if link_name != "N/A": normalized_url = href.rstrip('/') # 重複を避けて links 辞書に追加 if not any(existing_url.rstrip('/') == normalized_url for existing_url in details["links"].values()): details["links"][link_name] = href # website タイプで見つかったものを優先的にメインURL候補へ (まだ未設定の場合) if is_website and details["url"] == "": details["url"] = href # website タイプならこのセレクタでの探索は終了 if info_type == "website": found_val = href # 見つかったことを示す break # websiteセレクタのループ脱出 # 各タイプの最初の有効な値を details に格納 (other_link は除く) if found_val and info_type in details and info_type != "other_link": details[info_type] = found_val # メインURLがまだ決まっていない場合、links 辞書から探す if details["url"] == "": priority = ["ウェブサイト", "authority"] # 公式サイトらしき名前を優先 found_url_in_links = False for p_word in priority: if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") for name, url in details["links"].items(): if p_word in name.lower(): details["url"] = url found_url_in_links = True break if found_url_in_links: break # それでも見つからなければ、ドメイン名らしきリンク > 最初のリンク if not found_url_in_links: domain_link = next((url for name, url in details["links"].items() if is_domain_like(name)), None) if domain_link: details["url"] = domain_link elif details["links"]: # linksに何かあれば最初のものをURLとする details["url"] = next(iter(details["links"].values())) # print(f" 基本情報抽出完了: Name='{details['name']}'") # --- 口コミ情報の抽出 --- # print(" 口コミ情報抽出開始 (span.wiI7pd 優先)...") review_container_selector = 'div.GHT2ce.NsCY4' review_container = soup.select_one(review_container_selector) if review_container: # print(f" '{review_container_selector}' 口コミコンテナ発見。") # 口コミカードの特定 (jftiEf or MyEned) review_card_selectors = ['div.jftiEf', 'div.MyEned'] review_cards = [] for sel in review_card_selectors: if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") review_cards = review_container.select(sel) if review_cards: # print(f" 口コミカードセレクタ '{sel}' で {len(review_cards)} 件発見。") break # if not review_cards: # print(" 警告: 口コミコンテナ内で口コミカードが見つかりません。") extracted_reviews = [] for card_idx, card in enumerate(review_cards): if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") try: review_text = "N/A"; reviewer_name = "N/A"; rating = "N/A" # 口コミ本文抽出 (span.wiI7pd 優先) text_span_wiI7pd = card.select_one('span.wiI7pd') if text_span_wiI7pd: review_text = text_span_wiI7pd.get_text(strip=True) else: # フォールバック: span[jscontroller="MZnM8e"] full_text_span = card.select_one('span[jscontroller="MZnM8e"]') if full_text_span: review_text = full_text_span.get_text(strip=True) # 投稿者名 (.d4r55) name_el = card.select_one('.d4r55'); if name_el: reviewer_name = name_el.get_text(strip=True) # 評価 (.kvMYJc aria-label) rating_el = card.select_one('.kvMYJc'); if rating_el: aria_label = rating_el.get('aria-label', ''); match = re.search(r'星 (\d+(\.\d+)?)', aria_label) # "星 5.0" などを想定 if match: rating = match.group(1) # 情報が一部でも取れていれば追加 if review_text != "N/A" or reviewer_name != "N/A": extracted_reviews.append({"reviewer": reviewer_name, "rating": rating, "text": review_text if review_text != "N/A" else ""}) except Exception as e_card: print(f" 口コミカード {card_idx+1} の解析中にエラー: {e_card}") extracted_reviews.append({"reviewer": "Error", "rating": "N/A", "text": f"解析エラー: {e_card}"}) details['reviews'] = extracted_reviews # print(f" 口コミ抽出完了: {len(details['reviews'])} 件") # else: # print(f" 警告: '{review_container_selector}' 口コミコンテナが見つかりません。") except InterruptedError as e_interrupt: # 中断エラーをキャッチ print(f" HTML解析処理が中断されました: {e_interrupt}") details['extraction_error'] = "Interrupted" details['status'] = 'Interrupted' # ステータスも中断にする except Exception as e_extract: print(f"★★★★★ HTML抽出処理中にエラーが発生しました ★★★★★") error_trace = traceback.format_exc() print(error_trace) details['extraction_error'] = f"Type: {type(e_extract).__name__}, Msg: {e_extract}\nTrace: {error_trace}" # print(f" [HTML Extractor - Details & Reviews (wiI7pd priority)] 完了: Name='{details['name']}'") return details # --- CSV Loading Function (From Script 1, 中断チェック追加) --- def load_queries(csv_path): """CSVファイルを読み込み、1列目のクエリをリストとして返す(中断チェックあり)""" queries = [] encodings_to_try = ['utf-8-sig', 'utf-8', 'cp932', 'shift_jis'] # 試すエンコーディングリスト file_encoding = None print(f"CSVファイル読み込み開始: {os.path.basename(csv_path)}") if not csv_path or not os.path.exists(csv_path): print("エラー: CSVファイルが見つかりません。") return [] # ファイルのエンコーディングを特定 for encoding in encodings_to_try: if interrupt_event.is_set(): print("CSV読み込み中に中断リクエスト検出"); return [] # 中断チェック try: with open(csv_path, 'r', encoding=encoding, errors='strict') as f: f.read(1024) # ファイルの一部を読んでエンコーディングを確認 file_encoding = encoding print(f" エンコーディング '{encoding}' で読み込み試行...") break except (UnicodeDecodeError, LookupError): continue # 次のエンコーディングを試す except Exception as e_enc: print(f" '{encoding}' 試行中に予期せぬエラー: {e_enc}") continue if not file_encoding: print(f"エラー: ファイル '{os.path.basename(csv_path)}' を読み込めるエンコーディングが見つかりません。") return [] line_num = 0 try: with open(csv_path, 'r', encoding=file_encoding, newline='') as f: reader = csv.reader(f) try: if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック header = next(reader) # 最初の行を読み込む line_num += 1 print(f" 1行目 (ヘッダー可能性あり): {header}") except StopIteration: print("情報: CSVファイルが空です。") return [] # ファイルが空なら終了 except InterruptedError as e_interrupt: print(e_interrupt) return [] # 1行目がヘッダーかどうかを判定 (簡易的) header_keywords = ['query', 'search', 'keyword', 'クエリ', '検索', 'キーワード', '店舗', '会社'] first_col_header = header[0].strip().lower() if header else "" is_header = any(hkw in first_col_header for hkw in header_keywords) # 1行目がヘッダーでなく、かつ内容があればクエリとして追加 if not is_header and header and header[0].strip(): queries.append(header[0].strip()) elif is_header: print(" 1行目はヘッダーと判断しスキップします。") # 2行目以降を処理 for row in reader: if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック line_num += 1 # 1列目にデータがあればクエリとして追加 if row and row[0].strip(): queries.append(row[0].strip()) # 1列目が空でも他の列にデータがあれば警告を表示 (スキップ対象) elif any(cell.strip() for cell in row): print(f"警告: 行 {line_num} の1列目が空です: {row}。スキップします。") print(f" CSVから {len(queries)} 件の有効なクエリを抽出しました。") except InterruptedError as e_interrupt: # 中断をキャッチ print(e_interrupt) print(f"中断リクエストにより、{len(queries)} 件のクエリまで読み込みました。") return queries # 途中までのクエリを返す except Exception as e: # CSV処理中のエラーハンドリング print(f"★★★★★ CSVファイル処理中にエラー (行 {line_num}) ★★★★★") print(f"エラータイプ: {type(e).__name__}") print(f"エラーメッセージ: {e}") print("--- スタックトレース ---") print(traceback.format_exc()) print("----------------------") return [] # エラー発生時は空リストを返す return queries # --- Single Query Processing Function (From Script 1, 中断チェック強化) --- def process_single_query_full_list(driver, query, query_index, output_dir, wait_config): """単一クエリ処理: 検索→リストスクロール→リンク抽出→詳細ページ→口コミタブ→口コミスクロール→「もっと見る」クリック→HTML取得→解析 (中断チェックあり)""" print(f"\n--- クエリ処理開始 [Index:{query_index}] ---: {query}") results_list = [] safe_query_part = re.sub(r'[\\/*?:"<>|]', '_', query)[:30].strip() or "empty_query" base_url = "https://www.google.com/maps/" # 待機時間設定 WAIT_TIME_BASE = wait_config['base'] WAIT_TIME_DETAIL = wait_config['detail'] WAIT_TIME_SEARCH = wait_config['search'] # スクロール設定 SCROLL_PAUSE_TIME = max(1.5, WAIT_TIME_BASE * 0.5) MAX_SCROLL_ATTEMPTS = 30 SCROLL_PAUSE_TIME_REVIEW = max(1.0, WAIT_TIME_BASE * 0.3) MAX_SCROLL_ATTEMPTS_REVIEW = 500 # 口コミは多い場合があるので回数を増やす REVIEW_SCROLL_STUCK_LIMIT = 5 # 口コミスクロール停止判定の閾値 try: # --- 中断チェック --- if interrupt_event.is_set(): raise InterruptedError("処理開始前に中断リクエスト") # 1. 検索実行とリスト表示待機 search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" print(f" URLにアクセス: {search_url}") driver.get(search_url) if interrupt_event.is_set(): raise InterruptedError("ページ読み込み後に中断リクエスト") print(f" 検索結果リスト表示待機 (最大{WAIT_TIME_SEARCH}秒)...") list_container_selector = 'div[role="feed"], div[aria-label*="の検索結果"]' try: # WebDriverWait も中断可能にするのは難しいので、ここではそのまま list_container = WebDriverWait(driver, WAIT_TIME_SEARCH).until( EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) ) WebDriverWait(driver, 10).until( EC.visibility_of_element_located((By.CSS_SELECTOR, f'{list_container_selector} a[href*="/maps/place/"]')) ) print(" 検索結果リスト表示を確認。") except TimeoutException as e_timeout: print(f" エラー: 検索結果リストの表示タイムアウト。URL: {search_url}\n{e_timeout}") results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Timeout)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Timeout'}) return results_list except Exception as e_wait: print(f"★★★★★ リスト待機中に予期せぬエラー ★★★★★\nURL: {search_url}\n{type(e_wait).__name__}: {e_wait}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Wait Exception)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Wait Exception'}) return results_list # 2. 検索リストのスクロール print(" 検索リストをスクロールして全結果を表示...") last_height = driver.execute_script("return arguments[0].scrollHeight", list_container) scroll_attempts = 0 stuck_count = 0 while scroll_attempts < MAX_SCROLL_ATTEMPTS: if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 中断チェック try: driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', list_container) interruptible_sleep(SCROLL_PAUSE_TIME) # 中断可能な待機 if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) end_markers = driver.find_elements(By.XPATH, "//span[contains(text(), '結果は以上です')] | //p[contains(text(), '結果は以上です')]") if any(el.is_displayed() for el in end_markers): print(" 「結果は以上です」表示確認。検索リストスクロール終了。") break if new_height == last_height: stuck_count += 1 # print(f" 検索リストスクロール高さ変化なし ({stuck_count}回目)。再試行...") interruptible_sleep(SCROLL_PAUSE_TIME * 1.5) # 中断可能な待機 if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) if new_height == last_height and stuck_count >= 3: print(" 高さ変化なしが続いたため、検索リストスクロール終了と判断。") break else: stuck_count = 0 last_height = new_height except Exception as e_scroll: if interrupt_event.is_set(): raise InterruptedError("検索リストスクロールエラー処理中に中断リクエスト") # エラー処理中もチェック print(f"★★★★★ 検索リストスクロール中にエラー ★★★★★\n{type(e_scroll).__name__}: {e_scroll}") print(" スクロールエラー発生。可能な範囲で続行します。") scroll_attempts += 1 if scroll_attempts >= MAX_SCROLL_ATTEMPTS: print(f" 検索リスト最大スクロール回数 ({MAX_SCROLL_ATTEMPTS}) 到達。") # 3. リンク抽出 if interrupt_event.is_set(): raise InterruptedError("リンク抽出前に中断リクエスト") # 中断チェック print(" 検索結果リストからリンクを抽出...") unique_place_links = set() result_card_selector = '.hfpxzc' try: list_container_updated = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) ) result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) # print(f" '{result_card_selector}' 要素を {len(result_cards)} 件発見。") if not result_cards: # print(f" 警告: '{result_card_selector}' が見つかりません。代替セレクタ 'a.hfpxzc' で試行...") result_card_selector = 'a.hfpxzc' result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) # print(f" 代替セレクタで {len(result_cards)} 件発見。") if not result_cards: # print(f" 警告: 代替セレクタ 'a.Nv2PK' で試行...") result_card_selector = 'a.Nv2PK' result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) # print(f" 代替セレクタで {len(result_cards)} 件発見。") link_extraction_errors = 0 for card_idx, card in enumerate(result_cards): if interrupt_event.is_set(): raise InterruptedError("リンク抽出ループ中に中断リクエスト") # 中断チェック try: link_element = None if card.tag_name == 'a': link_element = card else: try: link_element = card.find_element(By.TAG_NAME, 'a') except NoSuchElementException: continue if link_element: href = link_element.get_attribute('href') if href and "/maps/place/" in href and not href.startswith("javascript:"): absolute_href = urljoin(base_url, href) unique_place_links.add(absolute_href) except StaleElementReferenceException: link_extraction_errors += 1 continue except Exception as e_extract_link: print(f"★★★★★ カード {card_idx+1} からのリンク抽出エラー ★★★★★\n{type(e_extract_link).__name__}: {e_extract_link}") link_extraction_errors += 1 if link_extraction_errors > 0: print(f" リンク抽出中に {link_extraction_errors} 件のエラーが発生しました。") print(f" 抽出したユニークリンク数: {len(unique_place_links)}") except Exception as e_find_links: print(f"★★★★★ リンク抽出プロセス全体でエラー ★★★★★\n使用したセレクタ: '{result_card_selector}'\n{type(e_find_links).__name__}: {e_find_links}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': f'Error (Link Extraction Fail)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Link Extraction Fail'}) return results_list if not unique_place_links: print(" 有効な詳細ページリンクが見つかりませんでした。このクエリの結果はありません。") results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': 'No Results Found', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Success: No Results'}) return results_list # 4. 各リンクの詳細ページを処理 print(f" {len(unique_place_links)} 件の詳細情報を取得...") link_list = sorted(list(unique_place_links)) processed_urls = set() for i, place_url in enumerate(link_list, 1): if interrupt_event.is_set(): raise InterruptedError("詳細ページ処理ループ開始前に中断リクエスト") # 中断チェック if place_url in processed_urls: continue processed_urls.add(place_url) print(f"\n --- 詳細取得 [Query:{query_index}, Result:{i}/{len(link_list)}] ---") result_details = {'query_index': query_index, 'original_query': query, 'result_rank': i, 'place_url': place_url, 'html_filename': 'N/A', 'name': 'N/A', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Pending', 'extraction_error': None} try: print(f" 詳細ページに遷移: {place_url}") driver.get(place_url) if interrupt_event.is_set(): raise InterruptedError("詳細ページ読み込み後に中断リクエスト") WebDriverWait(driver, WAIT_TIME_DETAIL).until( EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1')) ) interruptible_sleep(WAIT_TIME_BASE * 0.2) # 中断可能な待機 if interrupt_event.is_set(): raise InterruptedError("詳細ページ待機後に中断リクエスト") # --- 口コミタブをクリック --- review_tab_text = "クチコミ" review_tab_xpath = f"//button[@role='tab'][contains(., '{review_tab_text}') or contains(@aria-label, '{review_tab_text}')]" review_tab_clicked = False review_scroll_element = None try: # print(f" {review_tab_text}タブ クリック試行...") review_tab = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.XPATH, review_tab_xpath)) ) driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", review_tab) interruptible_sleep(0.3) if interrupt_event.is_set(): raise InterruptedError("口コミタブクリック前に中断リクエスト") driver.execute_script("arguments[0].click();", review_tab) review_tab_clicked = True print(f" {review_tab_text}タブをクリックしました。口コミコンテナ表示待機...") review_container_selector = 'div.GHT2ce.NsCY4' first_review_card_selector = f'{review_container_selector} div.jftiEf:first-of-type, {review_container_selector} div.MyEned:first-of-type' review_scroll_element = WebDriverWait(driver, WAIT_TIME_DETAIL).until( EC.visibility_of_element_located((By.CSS_SELECTOR, review_container_selector)) ) WebDriverWait(driver, 5).until( EC.visibility_of_element_located((By.CSS_SELECTOR, first_review_card_selector)) ) print(f" 口コミコンテナ表示確認、スクロール要素取得。") interruptible_sleep(WAIT_TIME_BASE * 0.5) if interrupt_event.is_set(): raise InterruptedError("口コミコンテナ待機後に中断リクエスト") except TimeoutException: print(f" 警告: {review_tab_text}タブまたは口コミコンテナの表示タイムアウト。") except ElementClickInterceptedException: print(f" 警告: {review_tab_text}タブのクリックが遮られました。") except NoSuchElementException: print(f" 警告: {review_tab_text}タブが見つかりません。") except Exception as e_click_review: print(f"★★★★★ {review_tab_text}タブ処理中に予期せぬエラー ★★★★★\n{type(e_click_review).__name__}: {e_click_review}") # --- 口コミエリアのスクロール処理 --- if review_scroll_element: print(" 口コミエリアをスクロールして全件表示試行...") review_last_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) review_scroll_attempts = 0 review_stuck_count = 0 while review_scroll_attempts < MAX_SCROLL_ATTEMPTS_REVIEW: if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 中断チェック try: driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', review_scroll_element) interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW) # 中断可能な待機 if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック review_new_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) if review_new_height == review_last_height: review_stuck_count += 1 if review_stuck_count >= REVIEW_SCROLL_STUCK_LIMIT: print(f" 口コミスクロール高さが{REVIEW_SCROLL_STUCK_LIMIT}回変化なし。スクロール終了と判断。") break else: interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW * 2) # 中断可能な待機 if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック else: review_stuck_count = 0 review_last_height = review_new_height except Exception as e_review_scroll: if interrupt_event.is_set(): raise InterruptedError("口コミスクロールエラー処理中に中断リクエスト") print(f"★★★★★ 口コミスクロール中にエラー ★★★★★\n{type(e_review_scroll).__name__}: {e_review_scroll}") print(" 口コミスクロールエラー発生。可能な範囲で続行します。") break review_scroll_attempts += 1 if review_scroll_attempts >= MAX_SCROLL_ATTEMPTS_REVIEW: print(f" 最大口コミスクロール回数 ({MAX_SCROLL_ATTEMPTS_REVIEW}) 到達。") print(" 口コミエリアのスクロール完了。") elif review_tab_clicked: print(" 警告: 口コミスクロール要素が見つからなかったため、口コミスクロールをスキップします。") # --- 「もっと見る」ボタンをクリック --- if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック前に中断リクエスト") if review_tab_clicked and review_scroll_element: print(" 「もっと見る」ボタンを検索してクリック試行...") more_buttons_xpath = "//button[contains(text(), 'もっと見る')]" clicked_count = 0 click_attempts = 0 max_click_attempts = 3 while click_attempts < max_click_attempts: if interrupt_event.is_set(): raise InterruptedError("「もっと見る」ループ中に中断リクエスト") # 中断チェック buttons_found_this_round = 0 try: more_buttons = driver.find_elements(By.XPATH, more_buttons_xpath) if not more_buttons: # if click_attempts == 0: print(" 「もっと見る」ボタンが見つかりませんでした。") # else: print(f" 追加の「もっと見る」ボタンは見つかりませんでした (試行 {click_attempts+1}/{max_click_attempts})。") break # print(f" 「もっと見る」ボタンを {len(more_buttons)} 個発見 (試行 {click_attempts+1}/{max_click_attempts})。クリック開始...") for btn_idx, button in enumerate(more_buttons): if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") # 中断チェック try: if button.is_displayed() and button.is_enabled(): driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", button) interruptible_sleep(0.2) if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") driver.execute_script("arguments[0].click();", button) clicked_count += 1 buttons_found_this_round += 1 interruptible_sleep(0.3) if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") except ElementClickInterceptedException: print(f" ボタン {btn_idx+1} のクリックが遮られました。スキップします。") except StaleElementReferenceException: print(f" ボタン {btn_idx+1} が古くなりました。スキップします。") except Exception as e_click_more: print(f" ボタン {btn_idx+1} のクリック中にエラー: {e_click_more}") # print(f" 今回の試行で {buttons_found_this_round} 個の「もっと見る」ボタンをクリックしました。") if buttons_found_this_round == 0: # print(" これ以上クリックできる「もっと見る」ボタンはありませんでした。") break except Exception as e_find_more: if interrupt_event.is_set(): raise InterruptedError("「もっと見る」検索エラー処理中に中断リクエスト") print(f"★★★★★ 「もっと見る」ボタン検索中にエラー ★★★★★\n{type(e_find_more).__name__}: {e_find_more}") break click_attempts += 1 if click_attempts < max_click_attempts: interruptible_sleep(1.0) if interrupt_event.is_set(): raise InterruptedError("「もっと見る」試行間待機中に中断リクエスト") if clicked_count > 0: print(f" 合計 {clicked_count} 個の「もっと見る」ボタンをクリックしました。") # else: print(" クリックされた「もっと見る」ボタンはありませんでした。") interruptible_sleep(WAIT_TIME_BASE * 0.5) if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック後に中断リクエスト") # --- HTML取得と保存 --- print(" ページのHTMLを取得・保存中...") detail_html_content = "" try: if interrupt_event.is_set(): raise InterruptedError("HTML取得前に中断リクエスト") detail_html_content = driver.page_source temp_name = 'N/A' try: temp_name = driver.find_element(By.TAG_NAME, 'h1').text except: pass safe_place_name_part = re.sub(r'[\\/*?:"<>|]', '_', temp_name)[:20].strip() or "no_name" tab_suffix = "_reviews_expanded" if review_tab_clicked else "_overview" # クエリごとのサブディレクトリを作成 query_subdir = os.path.join(output_dir, f"Q{query_index:03d}_{safe_query_part}") os.makedirs(query_subdir, exist_ok=True) detail_html_fname = f"R{i:03d}_{safe_place_name_part}{tab_suffix}.html" detail_html_path = os.path.join(query_subdir, detail_html_fname) with open(detail_html_path, 'w', encoding='utf-8') as f: f.write(detail_html_content) # 相対パスを保存 result_details['html_filename'] = os.path.join(f"Q{query_index:03d}_{safe_query_part}", detail_html_fname) print(f" HTMLを保存しました: {result_details['html_filename']}") except Exception as e_save_html: print(f" HTML取得/保存エラー: {e_save_html}") result_details['html_filename'] = 'Error Saving HTML' # --- HTML解析 --- if detail_html_content: print(" HTMLを解析して情報を抽出中...") if interrupt_event.is_set(): raise InterruptedError("HTML解析前に中断リクエスト") extracted_info = extract_details_and_reviews_from_html(detail_html_content) result_details.update(extracted_info) # 抽出関数内で中断された場合、ステータスが'Interrupted'になっているはず if result_details.get('status') != 'Interrupted': if result_details.get('extraction_error'): result_details['status'] = f"Warning: HTML Extraction Error" else: result_details['status'] = 'Success' print(" HTML解析完了。") else: print(" エラー: HTMLコンテンツが空のため、情報抽出をスキップします。") result_details['status'] = 'Error: Empty HTML Content' except TimeoutException as e_timeout_detail: print(f"★★★★★ 詳細ページ読み込みタイムアウト ★★★★★\nURL: {place_url}") result_details['status'] = f'Error: Detail Page Timeout'; result_details['name'] = f"Error (Timeout R:{i})" except NoSuchElementException as e_nse: print(f"★★★★★ 詳細ページで必須要素(h1など)が見つかりません ★★★★★\nURL: {place_url}") result_details['status'] = f'Error: Detail Page Missing Element (e.g., h1)'; result_details['name'] = f"Error (ElementNotFound R:{i})" except Exception as e_detail: if interrupt_event.is_set(): raise InterruptedError("詳細ページ例外処理中に中断リクエスト") # 例外処理中もチェック print(f"★★★★★ 詳細ページ処理中に予期せぬエラー ★★★★★\nURL: {place_url}\n{type(e_detail).__name__}: {e_detail}") result_details['status'] = f'Error: Detail Page Exception - {type(e_detail).__name__}'; result_details['name'] = f"Error (Exception R:{i})" finally: # 中断された場合、ステータスを上書き if interrupt_event.is_set() and result_details.get('status') != 'Interrupted': result_details['status'] = 'Interrupted' results_list.append(result_details) except InterruptedError as e_interrupt: # クエリ処理全体で中断をキャッチ print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理中に中断リクエスト: {e_interrupt} ★★★★★") # 中断されたことを示す結果を追加 results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 'N/A', 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Interrupted Query {query_index}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Interrupted'}) # ★重要★ 中断例外を再度発生させ、run_scraping関数に中断を伝える raise e_interrupt except Exception as e_main_query: print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理全体でエラー ★★★★★\n{type(e_main_query).__name__}: {e_main_query}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Error (Overall Query {query_index})', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Query Level Exception - {type(e_main_query).__name__}'}) finally: status_msg = "中断" if interrupt_event.is_set() else "完了" print(f"--- クエリ処理{status_msg} [Index:{query_index}] - {len(results_list)} 件の結果 ---") return results_list # --- 中断リクエスト用関数 (From Script 1) --- def request_interrupt(): """中断フラグをセットする""" if not interrupt_event.is_set(): print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("!!! 中断リクエストを受け付けました。 !!!") print("!!! 現在のスクレイピング処理が完了次第、停止します... !!!") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") interrupt_event.set() else: print("\n--- 中断は既にリクエストされています ---") return "[中断リクエスト受信]" # --- Gradio Processing Function (From Script 1, 中断処理対応, 途中ダウンロード削除) --- def run_scraping(input_csv_file, output_dir_name, output_csv_name, csv_encoding, wait_time_base, wait_time_detail, wait_time_search, headless_mode, progress=gr.Progress()): """Gradioインターフェースから呼び出されるスクレイピング処理関数""" log_stream = io.StringIO() # ログ出力用 start_time_total = time.time() # 全体処理時間計測開始 driver = None # WebDriverオブジェクト初期化 processed_query_count = 0 # 処理済みクエリ数 total_results_count = 0 # CSV書き込み総行数 total_queries = 0 # 総クエリ数 output_csv_path = None # 出力CSVファイルパス html_base_output_dir = None # HTML出力ベースディレクトリ interrupted_flag = False # 処理が中断されたかを示すフラグ # --- 中断フラグをリセット --- interrupt_event.clear() print("中断フラグをリセットしました。", file=log_stream) # 標準出力と標準エラー出力をログストリームにリダイレクト with contextlib.redirect_stdout(log_stream), contextlib.redirect_stderr(log_stream): try: print("=== スクレイピング処理開始 ===") print(f"開始時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 入力ファイルチェック if input_csv_file is None: print("エラー: クエリCSVファイルが選択されていません。処理を中断します。") yield log_stream.getvalue(), None, None # ログ, 結果CSV, HTMLフォルダパス return yield log_stream.getvalue(), None, None # 初期ログをUIに反映 # パラメータ設定 SEARCH_QUERIES_CSV_PATH = input_csv_file.name OUTPUT_DIR = output_dir_name.strip() or "gmap_scraping_output" OUTPUT_CSV_FILENAME = output_csv_name.strip() or "scraping_results.csv" CSV_ENCODING = csv_encoding try: wait_config = { 'base': max(1.0, float(wait_time_base)), 'detail': max(10.0, float(wait_time_detail)), 'search': max(5.0, float(wait_time_search)) } except ValueError: print("警告: 待機時間に無効な値が入力されました。デフォルト値を使用します。") wait_config = {'base': 4.0, 'detail': 25.0, 'search': 15.0} print(f"待機時間設定: 基本={wait_config['base']}秒, 詳細/口コミ={wait_config['detail']}秒, 検索={wait_config['search']}秒") yield log_stream.getvalue(), None, None # 出力ディレクトリ設定と作成 if not os.path.isabs(OUTPUT_DIR): OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR) html_base_output_dir = os.path.join(OUTPUT_DIR, "html_files") # HTML保存用サブディレクトリ output_csv_path = os.path.join(OUTPUT_DIR, OUTPUT_CSV_FILENAME) # CSVはメインディレクトリに print(f"HTML出力先ベースディレクトリ: {html_base_output_dir}") print(f"CSV出力先ファイル: {output_csv_path}") os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(html_base_output_dir, exist_ok=True) # HTML用サブディレクトリも作成 yield log_stream.getvalue(), None, html_base_output_dir # HTMLフォルダパスを返す # CSVからクエリ読み込み (中断チェックあり) queries = load_queries(SEARCH_QUERIES_CSV_PATH) yield log_stream.getvalue(), None, html_base_output_dir if interrupt_event.is_set(): # 読み込み中に中断されたかチェック print("CSV読み込み中に中断されたため、処理を終了します。") interrupted_flag = True raise InterruptedError("CSV loading interrupted") # 処理を中断フローへ if not queries: print("エラー: CSVから処理可能なクエリが見つかりませんでした。処理を終了します。") yield log_stream.getvalue(), None, html_base_output_dir return total_queries = len(queries) print(f"{total_queries} 件のクエリを処理します。") yield log_stream.getvalue(), None, html_base_output_dir # --- 中断チェック --- if interrupt_event.is_set(): raise InterruptedError("WebDriver初期化前に中断リクエスト") # WebDriver初期化 progress(0, desc="WebDriver初期化中...") print("\nWebDriver初期化中...") yield log_stream.getvalue(), None, html_base_output_dir options = Options() options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--lang=ja-JP') options.add_argument("--window-size=1920,1080") options.add_argument('--disable-extensions') options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument('--disable-gpu') options.add_experimental_option('excludeSwitches', ['enable-automation']) options.add_experimental_option('useAutomationExtension', False) options.add_experimental_option("prefs", { "credentials_enable_service": False, "profile.password_manager_enabled": False }) if headless_mode: print(" ヘッドレスモードで実行します。") options.add_argument('--headless=new') else: print(" 通常モード (非ヘッドレス) で実行します。") try: if IN_COLAB and gs: print(" Colab環境でgoogle_colab_seleniumを使用します。") driver = gs.Chrome(options=options) elif not IN_COLAB and ChromeService and ChromeDriverManager: try: print(" webdriver-managerを使用してChromeDriverパスを解決します...") service = ChromeService(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=options) print(" ChromeDriver (webdriver-manager) 起動成功。") except Exception as e_wdm: print(f" webdriver-managerでの初期化エラー: {e_wdm}") print(" PATH上のChromeDriverで試行します...") driver = webdriver.Chrome(options=options) print(" ChromeDriver (PATH) 起動成功。") elif not IN_COLAB: print(" PATH上のChromeDriverを使用します...") driver = webdriver.Chrome(options=options) print(" ChromeDriver (PATH) 起動成功。") else: raise Exception("WebDriverを初期化できませんでした。適切なWebDriver設定が見つかりません。") driver.implicitly_wait(3) print("WebDriver初期化完了。") except Exception as e_wd_init: print(f"★★★★★ WebDriver初期化失敗 ★★★★★") print(f"エラータイプ: {type(e_wd_init).__name__}") print(f"エラーメッセージ: {e_wd_init}") print("--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") yield log_stream.getvalue(), None, html_base_output_dir return yield log_stream.getvalue(), None, html_base_output_dir # --- 中断チェック --- if interrupt_event.is_set(): raise InterruptedError("CSV処理開始前に中断リクエスト") # CSVヘッダーを定義 (口コミ本文も個別列にするか検討 → 結合文字列でよさそう) csv_header = ['QueryIndex', 'OriginalQuery', 'ResultRank', 'Status', 'ExtractedName', 'ExtractedWebsite', 'ExtractedPhone', 'ExtractedAddress', 'ReviewCount', 'ReviewsCombined', 'ExtractionError', 'PlaceURL', 'DetailHTMLFilename'] file_exists = os.path.exists(output_csv_path) file_mode = 'a' if file_exists and os.path.getsize(output_csv_path) > 0 else 'w' print(f"結果CSVファイルを '{file_mode}' モードで開きます (パス: {output_csv_path}, エンコーディング: {CSV_ENCODING})。") yield log_stream.getvalue(), None, html_base_output_dir try: with open(output_csv_path, file_mode, newline='', encoding=CSV_ENCODING, errors='replace') as csv_file: writer = csv.writer(csv_file) if file_mode == 'w': print(" 新規CSVファイルのためヘッダー行を書き込みます。") writer.writerow(csv_header) csv_file.flush() elif file_exists: print(f" 既存ファイル '{os.path.basename(output_csv_path)}' に追記します。") for i, query in enumerate(queries, 1): # --- ループ開始時に中断チェック --- if interrupt_event.is_set(): print(f"\n===== クエリ {i}/{total_queries} の処理開始前に中断リクエストを検出 =====") interrupted_flag = True break # ループを抜ける progress(i / total_queries, desc=f"クエリ {i}/{total_queries} 処理中: {query[:30]}...") start_time_query = time.time() print(f"\n===== クエリ {i}/{total_queries} 開始: '{query}' =====") yield log_stream.getvalue(), None, html_base_output_dir results = [] try: # --- 単一クエリのスクレイピング処理実行 (中断例外をキャッチ) --- # HTML保存先として html_base_output_dir を渡す results = process_single_query_full_list(driver, query, i, html_base_output_dir, wait_config) except InterruptedError as e_interrupt_query: print(f"クエリ {i} の処理が中断されました: {e_interrupt_query}") interrupted_flag = True # メインループに中断を伝える if not any(r['status'] == 'Interrupted' for r in results): results.append({'query_index': i, 'original_query': query, 'result_rank': 'N/A', 'status': 'Interrupted', 'name': f'Interrupted Query {i}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'extraction_error': str(e_interrupt_query), 'place_url': 'N/A', 'html_filename': 'N/A'}) yield log_stream.getvalue(), None, html_base_output_dir # --- 取得結果をCSVに書き込み --- written_count_query = 0 print(f" クエリ {i} の結果をCSVに書き込み中...") for result_data in results: try: reviews_list = result_data.get('reviews', []) review_count = 0 formatted_reviews = "" if isinstance(reviews_list, list) and reviews_list: review_texts = [] for idx, review_item in enumerate(reviews_list): if isinstance(review_item, dict): r_text = str(review_item.get('text', '')).replace('\n', ' ').replace('\r', '') reviewer = review_item.get('reviewer', 'N/A') rating = review_item.get('rating', 'N/A') review_texts.append(f"[{idx+1}] {reviewer} ({rating}): {r_text}") elif isinstance(review_item, str): review_texts.append(f"[{idx+1}] {review_item.replace('n', ' ').replace('r', '')}") formatted_reviews = " || ".join(review_texts) # 区切り文字で結合 review_count = len(reviews_list) elif isinstance(reviews_list, str): # 文字列の場合(エラーメッセージなど) formatted_reviews = reviews_list.replace('\n', ' ').replace('\r', '') extraction_error_msg = result_data.get('extraction_error', '') if extraction_error_msg and len(extraction_error_msg) > 500: extraction_error_msg = extraction_error_msg[:250] + "..." + extraction_error_msg[-250:] # CSVヘッダーに合わせてデータを準備 row_data = [ result_data.get('query_index', i), result_data.get('original_query', query), result_data.get('result_rank', 'N/A'), result_data.get('status', 'Unknown'), result_data.get('name', 'N/A'), result_data.get('url', ''), result_data.get('phone', 'N/A'), result_data.get('address', 'N/A'), review_count, # レビュー数 formatted_reviews, # 結合されたレビュー文字列 extraction_error_msg, result_data.get('place_url', 'N/A'), # HTMLファイル名は output_dir からの相対パス result_data.get('html_filename', 'N/A') ] writer.writerow(row_data) written_count_query += 1 except Exception as e_write: print(f"★★★★★ CSV書き込み中にエラーが発生しました (行スキップ) ★★★★★") print(f"エラーデータ (一部): {str(result_data)[:200]}...") print(f"エラータイプ: {type(e_write).__name__}: {e_write}") csv_file.flush() total_results_count += written_count_query processed_query_count += 1 end_time_query = time.time() query_status_msg = "中断" if result_data.get('status') == 'Interrupted' else "完了" print(f"===== クエリ {i}/{total_queries} {query_status_msg} - {written_count_query}件書き込み, 所要時間: {end_time_query - start_time_query:.2f} 秒 =====") # ここで部分的なCSVを yield しないように変更 yield log_stream.getvalue(), None, html_base_output_dir # 中断フラグが立っていたら、ループを終了 if interrupted_flag: print("\n中断リクエストに従い、次のクエリへ進まず処理を終了します。") break # --- クエリ間の待機 (中断可能) --- if i < total_queries and not interrupted_flag: sleep_duration = wait_config['base'] * 1.5 + (hash(query + str(i)) % (wait_config['base'] * 1.5)) sleep_duration = max(wait_config['base'] * 0.8, min(sleep_duration, wait_config['base'] * 4.0)) print(f"次のクエリまで {sleep_duration:.2f} 秒待機します...") yield log_stream.getvalue(), None, html_base_output_dir interruptible_sleep(sleep_duration) # 待機後にも中断チェック if interrupt_event.is_set(): print("待機中に中断リクエストを検出。処理を終了します。") interrupted_flag = True break # ループを抜ける elif interrupted_flag: pass # 中断されたら待機しない else: print("\n全クエリの処理が完了しました。") except IOError as e_io: print(f"★★★★★ CSVファイル '{output_csv_path}' のオープン/書き込み中にIOエラー ★★★★★") print(f"エラータイプ: {type(e_io).__name__}: {e_io}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") print("ファイルが他のプログラムで開かれていないか、書き込み権限があるか確認してください。") output_csv_path = None # 結果ファイルパスを無効化 except Exception as e_csv_loop: print(f"★★★★★ CSV処理ループ中に予期せぬエラー ★★★★★") print(f"エラータイプ: {type(e_csv_loop).__name__}: {e_csv_loop}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") except InterruptedError: # run_scraping全体で中断をキャッチ print("\n★★★★★ スクレイピング処理がユーザーによって中断されました ★★★★★") interrupted_flag = True # 中断フラグを立てる except Exception as e_main: print(f"\n★★★★★ メイン処理 (run_scraping) 中に予期せぬエラーが発生しました ★★★★★") print(f"エラータイプ: {type(e_main).__name__}: {e_main}") print("\n--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") finally: # --- 終了処理 --- if driver: print("\nWebDriver終了処理中...") try: driver.quit() print("WebDriver正常終了。") except Exception as e_quit: print(f"★★★★★ WebDriver終了時にエラー ★★★★★") print(f"エラータイプ: {type(e_quit).__name__}: {e_quit}") end_time_total = time.time() total_duration_seconds = end_time_total - start_time_total final_status = "中断" if interrupted_flag else "完了" print(f"\n=== スクレイピング全処理終了 ({final_status}) ===") print(f"終了時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"処理{final_status}クエリ数: {processed_query_count}/{total_queries if total_queries > 0 else 'N/A'} 件") print(f"CSV書き込み総行数: {total_results_count} 件") print(f"総処理時間: {total_duration_seconds:.2f} 秒 ({total_duration_seconds/60:.2f} 分)") if interrupted_flag: print("*** スクレイピング処理は途中で中断されました ***") final_log = log_stream.getvalue() # プログレスバーを完了状態にする progress(1.0, desc=f"スクレイピング処理 {final_status}") # 最終的なCSVファイルのパスを返す final_csv_output = None if output_csv_path and os.path.exists(output_csv_path) and os.path.getsize(output_csv_path) > 0: print(f"結果CSVファイル: {output_csv_path}") final_csv_output = gr.File(value=output_csv_path, label=f"スクレイピング結果CSV ({final_status})") elif output_csv_path: print(f"警告: 結果CSVファイル '{output_csv_path}' は空または存在しません。") else: print("結果CSVファイルは生成されませんでした。") # HTMLフォルダパスも返す yield final_log, final_csv_output, html_base_output_dir # --- Helper Functions (From Script 2) --- def normalize_folder_path(folder_path): """フォルダパスを正規化する関数""" try: if not isinstance(folder_path, str): return None # 引用符や余分なスペースを削除 folder_path = folder_path.strip().strip('"').strip("'") # バックスラッシュをスラッシュに変換し、正規化 folder_path = os.path.normpath(folder_path).replace("\\", "/") return folder_path except Exception as e: print(f"フォルダパス正規化エラー: {e}") return None def extract_shop_name_from_html_filename(filename): """HTMLファイル名から店名を抽出する関数 (拡張)""" try: # 例: R001_店舗名_reviews_expanded.html # 例: R001_店舗名_overview.html # 例: Q001_R001_店舗名_クエリ_detail_reviews_expanded.html (古い形式も考慮) base = os.path.basename(filename) # まず拡張子と既知の接尾辞を削除 base = re.sub(r'(_reviews_expanded|_overview)?\.html$', '', base) # ランキング部分 (Rxxx_ または Qxxx_Rxxx_) を削除 base = re.sub(r'^(Q\d+_)?R\d+_', '', base) # 古い形式の可能性のある接尾辞を削除 base = re.sub(r'_detail_overview$|_detail_reviews_expanded$|_detailRESS$', '', base) # 残った部分を店名とする (前後のアンダースコアや空白トリム) shop_name = base.replace('_', ' ').strip() return shop_name if shop_name else filename except: return filename # エラー時は元のファイル名を返す def collect_reviews_from_html(folder_path, progress=gr.Progress()): """指定フォルダ内のHTMLから口コミデータを収集する関数""" reviews_data = [] log_stream = io.StringIO() # フォルダパスを正規化 folder_path = normalize_folder_path(folder_path) if not folder_path: print("エラー: 無効なHTMLフォルダパスです。", file=log_stream) return pd.DataFrame(), log_stream.getvalue() # フォルダの存在確認 if not os.path.exists(folder_path): print(f"エラー: フォルダ '{folder_path}' が見つかりません。", file=log_stream) return pd.DataFrame(), log_stream.getvalue() # フォルダ内のすべてのHTMLファイルを処理対象とする try: all_files = [] # 再帰的にサブディレクトリも探索 for root, _, files in os.walk(folder_path): for filename in files: if filename.lower().endswith(".html"): all_files.append(os.path.join(root, filename)) if not all_files: print(f"警告: '{folder_path}' 以下にHTMLファイルが見つかりません。", file=log_stream) return pd.DataFrame(), log_stream.getvalue() print(f"処理対象のHTMLファイル数: {len(all_files)}", file=log_stream) total_files = len(all_files) for i, file_path in enumerate(all_files): progress(i / total_files, desc=f"HTML解析中 {i}/{total_files}") filename = os.path.basename(file_path) relative_path = os.path.relpath(file_path, folder_path) # ベースフォルダからの相対パス # 店名をファイル名から抽出 shop_name = extract_shop_name_from_html_filename(filename) # HTMLファイルを読み込む try: with open(file_path, "r", encoding="utf-8") as file: html_content = file.read() except Exception as e: print(f"ファイル '{relative_path}' の読み込みエラー: {e}", file=log_stream) continue # BeautifulSoupでパース soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') # 基本情報も念のため抽出 (h1があれば店名として優先) h1_tag = soup.find('h1') if h1_tag: shop_name = h1_tag.get_text(strip=True) # 口コミカードを取得 (Script 1の抽出ロジックに合わせる: jftiEf or MyEned) review_card_selectors = ['div.jftiEf', 'div.MyEned'] review_cards = [] for sel in review_card_selectors: review_cards = soup.select(sel) if review_cards: break if not review_cards: # print(f"ファイル '{relative_path}' に口コミデータが見つかりません (jftiEf/MyEned)。", file=log_stream) # 口コミがなくても、店舗情報だけは記録するかもしれない(オプション) # reviews_data.append({ # "ファイル名": relative_path, "店名": shop_name, "投稿者": "N/A", # "投稿者情報": "N/A", "評価": "N/A", "投稿時期": "N/A", # "口コミ本文": "口コミなし", "オーナーからの返信": "N/A" # }) continue for card_idx, card in enumerate(review_cards): try: # 投稿者名 (.d4r55) reviewer_el = card.select_one('.d4r55') reviewer_name = reviewer_el.get_text(strip=True) if reviewer_el else "不明" # 投稿者情報(ローカルガイド情報など .RfnDt) reviewer_info_el = card.select_one('.RfnDt') reviewer_details = reviewer_info_el.get_text(strip=True) if reviewer_info_el else "なし" # 評価 (.kvMYJc aria-label) rating_el = card.select_one('.kvMYJc') rating_value = "不明" if rating_el and rating_el.get('aria-label'): match = re.search(r'星 (\d+(\.\d+)?)', rating_el['aria-label']) if match: rating_value = f"星 {match.group(1)}" # "星 X.X" 形式で保存 # 投稿時期 (.rsqaWe) post_time_el = card.select_one('.rsqaWe') post_time_value = post_time_el.get_text(strip=True) if post_time_el else "不明" # 口コミ本文 (span.wiI7pd 優先) review_text_el = card.select_one('span.wiI7pd') if not review_text_el: review_text_el = card.select_one('span[jscontroller="MZnM8e"]') # フォールバック review_content = review_text_el.get_text(strip=True) if review_text_el else "なし" # オーナーからの返信 (.CDe7pd) - 注意: これは古いセレクタかもしれない # 新しい構造では返信は別のdiv構造になっている可能性がある # 簡単のため、一旦 .CDe7pd を試す owner_response_el = card.select_one('.CDe7pd') owner_response_text = owner_response_el.get_text(strip=True) if owner_response_el else "なし" # データを辞書形式で保存 review_data = { "ファイル名": relative_path, "店名": shop_name, "投稿者": reviewer_name, "投稿者情報": reviewer_details, "評価": rating_value, "投稿時期": post_time_value, "口コミ本文": review_content, "オーナーからの返信": owner_response_text } reviews_data.append(review_data) except Exception as e_card: print(f"ファイル '{relative_path}' の口コミカード {card_idx+1} の解析中にエラー: {e_card}", file=log_stream) progress(1.0, desc="HTML解析完了") except Exception as e_folder: print(f"フォルダ '{folder_path}' の処理中に予期せぬエラー: {e_folder}", file=log_stream) print(traceback.format_exc(), file=log_stream) return pd.DataFrame(), log_stream.getvalue() # DataFrameに変換 df = pd.DataFrame(reviews_data) if df.empty: print("警告: 口コミデータが収集できませんでした。", file=log_stream) else: print(f"収集された口コミデータ件数: {len(df)}", file=log_stream) # print("DataFrameの列:", df.columns.tolist(), file=log_stream) # デバッグ用 return df, log_stream.getvalue() # --- Functions for Review Search Tab --- def get_csv_columns_safe(csv_file_obj): """アップロードされたCSVファイルから安全に列名を取得する""" if csv_file_obj is None: return gr.Dropdown(choices=[], label="検索対象の列 (CSVをアップロードしてください)") try: # pandasで読み込んで列名を取得 # TODO: エンコーディング自動判別を追加した方が良いかも df_peek = pd.read_csv(csv_file_obj.name, nrows=5) # 先頭数行だけ読む columns = df_peek.columns.tolist() # "ReviewsCombined" や "口コミ" など、検索に適した列をデフォルトで選択させる候補 default_col = next((c for c in columns if c.lower() in ['reviewscombined', '口コミ', '口コミ本文', 'text', 'review']), columns[0] if columns else None) return gr.Dropdown(choices=columns, value=default_col, label="検索対象の列") except Exception as e: print(f"CSV列名取得エラー: {e}") return gr.Dropdown(choices=[], label=f"列名取得エラー: {e}") def search_reviews_controller(search_source, html_folder_path, uploaded_csv_file, search_column, keyword, progress=gr.Progress()): """口コミ検索のコントローラー関数""" log_stream = io.StringIO() df = pd.DataFrame() search_results_df = pd.DataFrame() temp_csv_path = None results_text = "" print(f"検索ソース: {search_source}", file=log_stream) try: if search_source == "HTMLフォルダから検索": if not html_folder_path: results_text = "エラー: HTMLフォルダパスを入力してください。" return results_text, None, None, log_stream.getvalue() print(f"HTMLフォルダから口コミを収集中: {html_folder_path}", file=log_stream) df, collect_log = collect_reviews_from_html(html_folder_path, progress) log_stream.write(collect_log) search_col_actual = "口コミ本文" # HTMLからの場合はこの列を検索 if df.empty: results_text = f"エラー: フォルダ '{html_folder_path}' から口コミデータが収集できませんでした。" elif search_col_actual not in df.columns: results_text = f"エラー: 収集したデータに '{search_col_actual}' 列が見つかりません。" elif search_source == "CSVファイルから検索": if uploaded_csv_file is None: results_text = "エラー: 検索対象のCSVファイルをアップロードしてください。" return results_text, None, None, log_stream.getvalue() if not search_column: results_text = "エラー: 検索対象の列を選択してください。" return results_text, None, None, log_stream.getvalue() print(f"アップロードされたCSVから検索: {os.path.basename(uploaded_csv_file.name)}, 列: {search_column}", file=log_stream) try: # TODO: エンコーディングを考慮 df = pd.read_csv(uploaded_csv_file.name) search_col_actual = search_column if search_col_actual not in df.columns: results_text = f"エラー: アップロードされたCSVに列 '{search_col_actual}' が見つかりません。" except Exception as e_csv: results_text = f"エラー: CSVファイルの読み込みに失敗しました。ファイル形式やエンコーディングを確認してください。\n{e_csv}" else: results_text = "エラー: 不明な検索ソースです。" # データフレームと検索列が有効かチェック if results_text: # 上記のいずれかでエラーが発生した場合 pass elif df.empty: if search_source == "HTMLフォルダから検索": results_text = "情報: 収集された口コミデータがありませんでした。" else: results_text = "エラー: CSVからデータを読み込めませんでした。" elif not keyword or keyword.strip() == "": results_text = "情報: キーワードが入力されていません。全件表示します。" search_results_df = df # キーワード空欄時は全件 else: keyword = keyword.strip() print(f"キーワード '{keyword}' で列 '{search_col_actual}' を検索中...", file=log_stream) try: # NaNを空文字列に変換してから検索 search_results_df = df[df[search_col_actual].fillna('').astype(str).str.contains(keyword, case=False, na=False)] count = len(search_results_df) if count > 0: results_text = f"キーワード '{keyword}' を含む口コミが {count} 件見つかりました。" print(results_text, file=log_stream) else: results_text = f"キーワード '{keyword}' を含む口コミは見つかりませんでした。" print(results_text, file=log_stream) except KeyError: results_text = f"エラー: DataFrameに検索対象列 '{search_col_actual}' が見つかりません。" print(results_text, file=log_stream) except Exception as e_search: results_text = f"検索中にエラーが発生しました: {e_search}" print(results_text, file=log_stream) print(traceback.format_exc(), file=log_stream) # 結果をCSVに保存 (検索結果がある場合) if not search_results_df.empty: try: with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", encoding="utf-8-sig") as temp_file: search_results_df.to_csv(temp_file.name, index=False) temp_csv_path = temp_file.name print(f"検索結果を一時CSVファイルに保存しました: {temp_csv_path}", file=log_stream) except Exception as e_csv_save: print(f"検索結果のCSV保存中にエラー: {e_csv_save}", file=log_stream) results_text += "\n警告: 検索結果のCSV保存に失敗しました。" # テーブル表示用に列を絞る(存在しない列は無視) display_columns = ['店名', '投稿者', '評価', '投稿時期', '口コミ本文', 'オーナーからの返信', 'ファイル名'] if search_source == "CSVファイルから検索" and not search_results_df.empty: # CSVからの場合、元の列名を優先しつつ、なければHTML由来の列名も試す available_cols = search_results_df.columns.tolist() display_columns = [col for col in available_cols if col in display_columns or col == search_column] # 検索列も表示 # 存在しない列を除外してDataFrameを返す display_df = search_results_df[[col for col in display_columns if col in search_results_df.columns]] if not search_results_df.empty else pd.DataFrame() except Exception as e_controller: error_msg = f"口コミ検索コントローラーで予期せぬエラー: {e_controller}" print(error_msg, file=log_stream) print(traceback.format_exc(), file=log_stream) results_text = error_msg return results_text, display_df, gr.File(value=temp_csv_path if temp_csv_path else None), log_stream.getvalue() def export_all_reviews_controller(search_source, html_folder_path, uploaded_csv_file, progress=gr.Progress()): """全口コミデータをCSVにエクスポートするコントローラー関数""" log_stream = io.StringIO() df = pd.DataFrame() temp_csv_path = None results_text = "" print(f"全件エクスポート開始。ソース: {search_source}", file=log_stream) try: if search_source == "HTMLフォルダから検索": if not html_folder_path: results_text = "エラー: HTMLフォルダパスを入力してください。" return results_text, None, log_stream.getvalue() print(f"HTMLフォルダから全口コミを収集中: {html_folder_path}", file=log_stream) df, collect_log = collect_reviews_from_html(html_folder_path, progress) log_stream.write(collect_log) if df.empty: results_text = f"情報: フォルダ '{html_folder_path}' から収集できる口コミデータがありませんでした。" elif search_source == "CSVファイルから検索": if uploaded_csv_file is None: results_text = "エラー: 対象のCSVファイルをアップロードしてください。" return results_text, None, log_stream.getvalue() print(f"アップロードされたCSVをエクスポート対象として読み込み中: {os.path.basename(uploaded_csv_file.name)}", file=log_stream) try: # アップロードされたCSVをそのままデータフレームとする df = pd.read_csv(uploaded_csv_file.name) if df.empty: results_text = "情報: アップロードされたCSVは空です。" except Exception as e_csv: results_text = f"エラー: CSVファイルの読み込みに失敗しました。\n{e_csv}" else: results_text = "エラー: 不明な検索ソースです。" # データフレームが有効で、空でない場合にCSVエクスポート if not df.empty: try: with tempfile.NamedTemporaryFile(delete=False, suffix="_all.csv", mode="w", encoding="utf-8-sig") as temp_file: df.to_csv(temp_file.name, index=False) temp_csv_path = temp_file.name results_text = f"全 {len(df)} 件のデータをCSVファイルにエクスポートしました。" print(results_text, file=log_stream) print(f"エクスポートファイル: {temp_csv_path}", file=log_stream) except Exception as e_csv_save: results_text = f"全件CSVエクスポート中にエラー: {e_csv_save}" print(results_text, file=log_stream) print(traceback.format_exc(), file=log_stream) except Exception as e_controller: error_msg = f"全件エクスポートコントローラーで予期せぬエラー: {e_controller}" print(error_msg, file=log_stream) print(traceback.format_exc(), file=log_stream) results_text = error_msg return results_text, gr.File(value=temp_csv_path if temp_csv_path else None), log_stream.getvalue() # --- Gradio UI 定義 (統合版) --- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Google Maps スクレイピング & 口コミ検索ツール") gr.Markdown( """ **タブ1:** Google Mapsから店舗情報をスクレイピングし、結果をCSVとHTMLファイルに出力します。 **タブ2:** スクレイピングで保存されたHTMLフォルダ、またはアップロードしたCSVファイルから口コミ情報を検索・エクスポートします。 """ ) with gr.Tabs(): with gr.TabItem("① スクレイピング実行"): gr.Markdown("### Google Maps スクレイピング設定") gr.Markdown( """ CSVクエリで検索し、詳細ページで「クチコミ」タブをクリック後、口コミエリアを**最後までスクロール**し、 さらに**「もっと見る」ボタンを全てクリック**して全件表示を試みます。 その後、基本情報と口コミ情報を抽出し、結果CSVとHTMLファイル群を出力します。 HTMLファイルはクエリごとにサブディレクトリに保存されます。 **「処理中断」ボタン**で進行中のスクレイピング処理を安全に停止できます(現在のクエリ完了後)。 """ ) with gr.Row(): with gr.Column(scale=2): gr.Markdown("#### 入力ファイルと出力設定") input_csv_file_scrape = gr.File(label="検索クエリCSVファイル (1列目のみ使用)", file_types=[".csv"]) output_dir_name_scrape = gr.Textbox(label="出力先ベースディレクトリ名", value="gmap_scraping_output") output_csv_name_scrape = gr.Textbox(label="出力CSVファイル名 (ベースディレクトリ内)", value="scraping_results.csv") csv_encoding_scrape = gr.Dropdown(label="出力CSVエンコーディング", choices=['utf-8-sig', 'cp932'], value='utf-8-sig') headless_mode_scrape = gr.Checkbox(label="ヘッドレスモードで実行 (エラー発生時はOFF推奨)", value=True) with gr.Column(scale=1): gr.Markdown("#### 待機時間設定 (秒)") wait_time_base_scrape = gr.Number(label="基本待機", minimum=1, maximum=20, step=0.5, value=4) wait_time_detail_scrape = gr.Number(label="詳細/口コミ最大待機", minimum=10, maximum=60, step=1, value=25) wait_time_search_scrape = gr.Number(label="検索リスト最大待機", minimum=5, maximum=60, step=1, value=15) with gr.Row(): start_button_scrape = gr.Button("スクレイピング開始", variant="primary", size="lg", scale=3) stop_button_scrape = gr.Button("処理中断", variant="stop", size="lg", scale=1) gr.Markdown("#### 処理ステータスとエラーログ") progress_bar_scrape = gr.Progress(track_tqdm=True) status_textbox_scrape = gr.Textbox(label="ログ", lines=15, interactive=False, autoscroll=True, max_lines=2000) gr.Markdown("#### 結果") output_csv_download_scrape = gr.File(label="結果CSVダウンロード", interactive=False) # HTMLフォルダパスを表示するためのテキストボックス (読み取り専用) html_output_folder_path_display = gr.Textbox(label="HTML保存先フォルダパス (口コミ検索タブで使用)", interactive=False) with gr.TabItem("② 口コミ検索"): gr.Markdown("### 口コミ検索・エクスポート") gr.Markdown( """ **検索ソース**を選択し、HTMLフォルダパスまたはCSVファイルを指定して口コミを検索・エクスポートします。 - **HTMLフォルダから検索:** タブ1で出力されたHTMLファイル群が含まれる**ベースディレクトリ内の `html_files` フォルダ**、または他のHTMLファイル群を含むフォルダを指定してください。 - **CSVファイルから検索:** タブ1で出力された結果CSV、または同様の形式のCSVファイルをアップロードしてください。 """ ) with gr.Row(): with gr.Column(scale=1): search_source_review = gr.Radio( choices=["HTMLフォルダから検索", "CSVファイルから検索"], label="検索ソースを選択", value="HTMLフォルダから検索" ) html_folder_path_review = gr.Textbox( label="HTMLフォルダパス", placeholder="例: gmap_scraping_output/html_files", visible=True # 初期表示 ) uploaded_csv_review = gr.File( label="検索対象CSVファイル", file_types=[".csv"], visible=False # 初期非表示 ) search_column_review = gr.Dropdown( label="検索対象の列 (CSV選択時)", choices=[], interactive=True, visible=False # 初期非表示 ) keyword_review = gr.Textbox(label="検索キーワード (空欄で全件)") search_button_review = gr.Button("検索実行", variant="primary") export_all_button_review = gr.Button("全件CSVエクスポート") with gr.Column(scale=2): gr.Markdown("#### 検索/エクスポート結果") status_textbox_review = gr.Textbox(label="処理状況", lines=5, interactive=False) output_table_review = gr.Dataframe(label="検索結果(テーブル)") search_csv_output_review = gr.File(label="検索結果CSVダウンロード", interactive=False) all_reviews_csv_output_review = gr.File(label="全件エクスポートCSVダウンロード", interactive=False) progress_bar_review = gr.Progress(track_tqdm=True) # 口コミ収集/エクスポート用 # --- イベントハンドラ定義 --- # --- タブ1: スクレイピング --- start_button_scrape.click( fn=run_scraping, inputs=[input_csv_file_scrape, output_dir_name_scrape, output_csv_name_scrape, csv_encoding_scrape, wait_time_base_scrape, wait_time_detail_scrape, wait_time_search_scrape, headless_mode_scrape], outputs=[status_textbox_scrape, output_csv_download_scrape, html_output_folder_path_display], # progress 引数は Gradio 側で自動的に渡される (show_progress='full' の場合) ) stop_button_scrape.click(fn=request_interrupt, inputs=None, outputs=status_textbox_scrape) # ログに中断リクエストを表示 # --- タブ2: 口コミ検索 --- # 検索ソースの選択に応じてUI表示を切り替え def update_review_source_ui(source): if source == "HTMLフォルダから検索": return { html_folder_path_review: gr.Textbox(visible=True), uploaded_csv_review: gr.File(visible=False, value=None), # クリア search_column_review: gr.Dropdown(visible=False, value=None, choices=[]) # クリア } elif source == "CSVファイルから検索": return { html_folder_path_review: gr.Textbox(visible=False, value=""), # クリア uploaded_csv_review: gr.File(visible=True), search_column_review: gr.Dropdown(visible=True) # 列選択を表示 } else: return { # デフォルト html_folder_path_review: gr.Textbox(visible=True), uploaded_csv_review: gr.File(visible=False, value=None), search_column_review: gr.Dropdown(visible=False, value=None, choices=[]) } search_source_review.change( fn=update_review_source_ui, inputs=search_source_review, outputs=[html_folder_path_review, uploaded_csv_review, search_column_review] ) # CSVアップロード時に列名を取得してドロップダウンを更新 uploaded_csv_review.upload( fn=get_csv_columns_safe, inputs=uploaded_csv_review, outputs=search_column_review ) # 検索ボタンのクリック search_button_review.click( fn=search_reviews_controller, inputs=[search_source_review, html_folder_path_review, uploaded_csv_review, search_column_review, keyword_review], outputs=[status_textbox_review, output_table_review, search_csv_output_review, status_textbox_scrape] # 最後のログはデバッグ用に入れておく ) # 全件エクスポートボタンのクリック export_all_button_review.click( fn=export_all_reviews_controller, inputs=[search_source_review, html_folder_path_review, uploaded_csv_review], outputs=[status_textbox_review, all_reviews_csv_output_review, status_textbox_scrape] # 最後のログはデバッグ用 ) # --- UI起動 --- print("Gradio UIを起動します...") # queue()で複数ユーザー対応、share=Trueで共有リンク生成 (Colabでは自動的に共有リンク) # launch() に debug=True をつけるとリロードなどが有効になるが、不安定になることもある demo.queue().launch(share=False, debug=False)