Spaces:
Runtime error
Runtime error
| import asyncio | |
| from playwright.async_api import async_playwright, Page, Browser, Playwright | |
| from bs4 import BeautifulSoup | |
| from bs4.element import Comment | |
| from urllib.parse import urlparse, parse_qs | |
| from typing import List, Dict, Optional | |
| import random # randomモジュールを追加 | |
| class WebScraper: | |
| """ | |
| DuckDuckGoでの検索、URLからのコンテンツ取得、HTMLクリーンアップを行うクラス。 | |
| """ | |
| # User-Agentのリストをクラス変数として定義 | |
| USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/108.0", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/108.0", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # 最新版に近いChrome | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15", # Safari | |
| "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 OPR/106.0.0.0", # Opera | |
| ] | |
| def __init__(self, headless: bool = True, default_timeout: int = 30000): | |
| """ | |
| WebScraperのインスタンスを初期化します。 | |
| Args: | |
| headless (bool): Playwrightをヘッドレスモードで実行するかどうか (デフォルト: True)。 | |
| default_timeout (int): ページのロードタイムアウト (ミリ秒、デフォルト: 30000 = 30秒)。 | |
| """ | |
| self.headless = headless | |
| self.default_timeout = default_timeout | |
| self._browser: Optional[Browser] = None | |
| self._playwright_instance: Optional[Playwright] = None # Playwrightインスタンスを保持 | |
| async def _launch_browser(self) -> Browser: | |
| """Playwrightを起動し、ブラウザを立ち上げます。 | |
| 既にブラウザが起動していればそれを再利用します。 | |
| """ | |
| if not self._browser or not self._browser.is_connected(): | |
| if self._playwright_instance is None: | |
| self._playwright_instance = await async_playwright().start() | |
| # ヘッドレスモードでの検出を避けるための引数を追加 | |
| self._browser = await self._playwright_instance.chromium.launch( | |
| headless=self.headless, | |
| args=[ | |
| '--no-sandbox', | |
| '--disable-setuid-sandbox', | |
| '--disable-infobars', | |
| '--window-size=1280,720', # 一般的なデスクトップサイズに設定 | |
| '--disable-blink-features=AutomationControlled' # ヘッドレス検出を回避 | |
| ] | |
| ) | |
| return self._browser | |
| async def _close_browser(self): | |
| """ブラウザを閉じ、Playwrightインスタンスも停止します。""" | |
| if self._browser and self._browser.is_connected(): | |
| await self._browser.close() | |
| self._browser = None | |
| if self._playwright_instance: | |
| await self._playwright_instance.stop() | |
| self._playwright_instance = None | |
| async def _get_new_page(self) -> Page: | |
| """新しいページ(タブ)を作成します。""" | |
| browser = await self._launch_browser() # ブラウザが起動または取得される | |
| page = await browser.new_page() | |
| page.set_default_timeout(self.default_timeout) | |
| # User-Agentをランダムに選択して設定 | |
| await page.set_extra_http_headers({ | |
| "User-Agent": random.choice(self.USER_AGENTS) | |
| }) | |
| # より包括的なステルス対策をページに適用 | |
| await page.evaluate("""Object.defineProperty(navigator, 'webdriver', { get: () => false });""") | |
| await page.evaluate("""Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });""") | |
| await page.evaluate("""Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });""") | |
| await page.evaluate("""window.chrome = { runtime: {}, loadTimes: function() {}, csi: function() {}, app: {} };""") | |
| await page.evaluate("""Object.defineProperty(navigator.permissions, 'query', { enumerable: true, configurable: true, writable: true, value: async (parameters) => ({ state: 'prompt' }) });""") | |
| # ページロード後の追加待機 | |
| await asyncio.sleep(random.uniform(2, 5)) | |
| return page | |
| async def search_duckduckgo(self, query: str, num_results: int = 3) -> List[Dict[str, str]]: | |
| """ | |
| DuckDuckGoで指定されたクエリを検索し、上位N件の検索結果(タイトルとURL)を返します。 | |
| """ | |
| results = [] | |
| page: Optional[Page] = None | |
| try: | |
| page = await self._get_new_page() | |
| print(f"Bingで '{query}' を検索中...") | |
| # networkidle でより安定したページロードを待機 | |
| await page.goto(f"https://www.bing.com/search?q=={query}&setlang=ja", wait_until='networkidle') | |
| # デバッグのためにページのスクリーンショットを保存 | |
| await page.screenshot(path="./duckduckgo_search_results.png") | |
| # 検索結果のタイトルリンク要素を特定するセレクタ | |
| # DuckDuckGoのHTML構造は変更される可能性があるため、適宜調整が必要です。 | |
| # 現在の一般的なセレクタは 'a[data-testid="result-title-link"]' もしくは 'h2 > a' ですが、 | |
| # ページ構造が変わった場合は、開発者ツールで適切なセレクタを見つけてください。 | |
| # 要素が見つかるまで、より長く待機するか、別のセレクタを試す | |
| try: | |
| await page.wait_for_selector('h2 > a', timeout=20000) # タイムアウトを長くする | |
| except Exception as e: | |
| print(f"セレクタ 'h2 > a' の待機中にタイムアウトしました: {e}") | |
| # ここで代替のセレクタを試すか、処理を終了する | |
| return [] | |
| search_links = page.locator('h2 > a') | |
| for i in range(min(num_results, await search_links.count())): | |
| link_element = search_links.nth(i) | |
| title = await link_element.text_content() | |
| url = await link_element.get_attribute("href") | |
| if url: | |
| parsed_url = urlparse(url) | |
| # DuckDuckGoのリダイレクトURLのデコードとクリーンアップ | |
| if parsed_url.netloc == 'duckduckgo.com' and parsed_url.path == '/l/': | |
| decoded_url = parse_qs(parsed_url.query).get('uddg', [''])[0] | |
| url = decoded_url | |
| # 結果を追加する前に、タイトルとURLが有効か軽くチェック | |
| if title and url and title.strip() != "" and url.strip() != "": | |
| results.append({"title": title.strip(), "url": url.strip()}) | |
| # 検索結果が一つも見つからなかった場合もスクリーンショットを保存 | |
| if not results: | |
| print(f"検索結果が見つかりませんでした。ページのスクリーンショットを './duckduckgo_no_results.png' に保存します。") | |
| await page.screenshot(path="./duckduckgo_no_results.png") | |
| except Exception as e: | |
| print(f"DuckDuckGo検索中にエラーが発生しました: {e}") | |
| finally: | |
| if page: | |
| await page.close() # ページを閉じる | |
| print(f"検索が完了しました。{len(results)} 件の結果が見つかりました。") | |
| return results | |
| async def _get_raw_html_content(self, url: str) -> Optional[str]: | |
| """指定されたURLから生のHTMLコンテンツを取得します。""" | |
| page: Optional[Page] = None | |
| try: | |
| page = await self._get_new_page() | |
| print(f" URL: {url} のコンテンツを取得中...") | |
| # networkidle でより安定したページロードを待機 | |
| await page.goto(url) | |
| return await page.content() | |
| except Exception as e: | |
| print(f" URL: {url} のコンテンツ取得中にエラーが発生しました: {e}") | |
| return None | |
| finally: | |
| if page: | |
| await page.close() | |
| def _clean_html_to_text(self, html_content: str) -> str: | |
| """ | |
| HTMLコンテンツからJavaScript、スタイル、不要なリンクなどを除去し、整形されたテキストを返します。 | |
| """ | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # スクリプトタグとスタイルタグを削除 | |
| for script_or_style in soup(["script", "style"]): | |
| script_or_style.decompose() | |
| # headタグ内のリンクタグ(CSSなど)を削除 | |
| if soup.head: | |
| for link_tag in soup.head.find_all('link'): | |
| link_tag.decompose() | |
| # HTMLコメントを削除 | |
| for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): | |
| comment.extract() | |
| # 複数の連続する改行を1つに減らす | |
| cleaned_text = soup.get_text(separator='\n', strip=True) | |
| # 空行を削除し、各行をトリム | |
| cleaned_text_lines = [line.strip() for line in cleaned_text.splitlines() if line.strip()] | |
| return '\n'.join(cleaned_text_lines) | |
| async def get_processed_documents(self, search_query: str, num_search_results: int = 2) -> List[Dict[str, str]]: | |
| """ | |
| DuckDuckGoで検索し、上位N件の検索結果のURLからクリーンなHTMLコンテンツを取得します。 | |
| Args: | |
| search_query (str): 検索クエリ。 | |
| num_search_results (int): 取得する検索結果の数。 | |
| Returns: | |
| List[Dict[str, str]]: 処理されたドキュメントのリスト。 | |
| 各ドキュメントは 'title', 'original_url', 'cleaned_html_content' を含む。 | |
| """ | |
| processed_documents = [] | |
| # クラスのインスタンスでブラウザのライフサイクルを管理 | |
| try: | |
| top_results = await self.search_duckduckgo(search_query, num_search_results) | |
| if top_results: | |
| for i, result in enumerate(top_results): | |
| print(f"\n--- 処理中の記事 {i+1} ---") | |
| print(f"タイトル: {result['title']}") | |
| print(f"元URL: {result['url']}") | |
| # 個別のURLのコンテンツを取得・クリーンアップ | |
| raw_html = await self._get_raw_html_content(result['url']) | |
| if raw_html: | |
| cleaned_content = self._clean_html_to_text(raw_html) | |
| processed_documents.append({ | |
| "title": result['title'], | |
| "original_url": result['url'], | |
| "cleaned_html_content": cleaned_content | |
| }) | |
| print(f" クリーンなコンテンツの長さ: {len(cleaned_content)} 文字") | |
| print(f" クリーンなコンテンツ(一部):\n{cleaned_content[:500]}...") | |
| else: | |
| print(" クリーンなコンテンツを取得できませんでした。") | |
| else: | |
| print("検索結果が見つからなかったため、処理をスキップします。") | |
| finally: | |
| # すべての処理が完了したらブラウザを閉じる | |
| await self._close_browser() | |
| return processed_documents | |
| # クラスの使用例 | |
| async def main(): | |
| scraper = WebScraper(headless=False) # まずはheadless=Trueで試してください | |
| query = "個人情報流出 事例" | |
| documents = await scraper.get_processed_documents(query, num_search_results=2) | |
| if documents: | |
| print("\n--- 処理されたドキュメント ---") | |
| for doc in documents: | |
| print(f"タイトル: {doc['title']}") | |
| print(f"URL: {doc['original_url']}") | |
| # print(f"コンテンツの長さ: {len(doc['cleaned_html_content'])} 文字") | |
| # print(f"コンテンツの一部: {doc['cleaned_html_content'][:200]}...\n") | |
| else: | |
| print("処理されたドキュメントはありませんでした。") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |