Spaces:
Runtime error
Runtime error
import asyncio | |
from playwright.async_api import async_playwright, Page, Browser | |
from bs4 import BeautifulSoup | |
from bs4.element import Comment # BeautifulSoupのコメント削除用 | |
from urllib.parse import urlparse, parse_qs | |
from typing import List, Dict, Optional | |
class WebScraper: | |
""" | |
DuckDuckGoでの検索、URLからのコンテンツ取得、HTMLクリーンアップを行うクラス。 | |
""" | |
def __init__(self, headless: bool = True, default_timeout: int = 30000): | |
""" | |
WebScraperのインスタンスを初期化します。 | |
Args: | |
headless (bool): Playwrightをヘッドレスモードで実行するかどうか (デフォルト: True)。 | |
default_timeout (int): ページのロードタイムアウト (ミリ秒、デフォルト: 30000 = 30秒)。 | |
""" | |
self.headless = headless | |
self.default_timeout = default_timeout | |
self._browser: Optional[Browser] = None # Browserインスタンスを保持するため | |
async def _launch_browser(self) -> Browser: | |
"""ブラウザを起動し、インスタンス変数に格納します。""" | |
if not self._browser or not self._browser.is_connected(): | |
self._browser = await async_playwright().chromium.launch(headless=self.headless) | |
return self._browser | |
async def _close_browser(self): | |
"""ブラウザを閉じます。""" | |
if self._browser and self._browser.is_connected(): | |
await self._browser.close() | |
self._browser = None | |
async def _get_new_page(self) -> Page: | |
"""新しいページ(タブ)を作成します。""" | |
browser = await self._launch_browser() | |
page = await browser.new_page() | |
page.set_default_timeout(self.default_timeout) | |
return page | |
async def search_duckduckgo(self, query: str, num_results: int = 3) -> List[Dict[str, str]]: | |
""" | |
DuckDuckGoで指定されたクエリを検索し、上位N件の検索結果(タイトルとURL)を返します。 | |
""" | |
results = [] | |
page: Optional[Page] = None # 明示的に型ヒントを追加 | |
try: | |
page = await self._get_new_page() | |
print(f"DuckDuckGoで '{query}' を検索中...") | |
await page.goto("https://duckduckgo.com/") | |
await page.fill("#search_form_input_homepage", query) | |
await page.press("#search_form_input_homepage", "Enter") | |
await page.wait_for_selector("#links .result__a", timeout=10000) | |
search_elements = await page.query_selector_all("#links .result") | |
for i, element in enumerate(search_elements): | |
if i >= num_results: | |
break | |
title_element = await element.query_selector(".result__a") | |
url_element = await element.query_selector(".result__url") | |
title = await title_element.text_content() if title_element else "タイトルなし" | |
url = await url_element.get_attribute("href") if url_element else "URLなし" | |
# DuckDuckGoのURLのデコードとクリーンアップ | |
if url and url != "URLなし": | |
parsed_url = urlparse(url) | |
if parsed_url.path == '/l/': | |
decoded_url = parse_qs(parsed_url.query).get('uddg', [''])[0] | |
url = decoded_url | |
results.append({"title": title.strip(), "url": url.strip()}) | |
except Exception as e: | |
print(f"DuckDuckGo検索中にエラーが発生しました: {e}") | |
finally: | |
if page: | |
await page.close() # ページを閉じる | |
print(f"検索が完了しました。{len(results)} 件の結果が見つかりました。") | |
return results | |
async def _get_raw_html_content(self, url: str) -> Optional[str]: | |
"""指定されたURLから生のHTMLコンテンツを取得します。""" | |
page: Optional[Page] = None | |
try: | |
page = await self._get_new_page() | |
print(f" URL: {url} のコンテンツを取得中...") | |
await page.goto(url) | |
return await page.content() | |
except Exception as e: | |
print(f" URL: {url} のコンテンツ取得中にエラーが発生しました: {e}") | |
return None | |
finally: | |
if page: | |
await page.close() | |
def _clean_html_to_text(self, html_content: str) -> str: | |
""" | |
HTMLコンテンツからJavaScript、スタイル、不要なリンクなどを除去し、整形されたテキストを返します。 | |
""" | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# スクリプトタグとスタイルタグを削除 | |
for script_or_style in soup(["script", "style"]): | |
script_or_style.decompose() | |
# headタグ内のリンクタグ(CSSなど)を削除 | |
if soup.head: | |
for link_tag in soup.head.find_all('link'): | |
link_tag.decompose() | |
# HTMLコメントを削除 | |
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): | |
comment.extract() | |
# 複数の連続する改行を1つに減らす | |
cleaned_text = soup.get_text(separator='\n', strip=True) | |
cleaned_text_lines = [line.strip() for line in cleaned_text.splitlines() if line.strip()] | |
return '\n'.join(cleaned_text_lines) | |
async def get_processed_documents(self, search_query: str, num_search_results: int = 2) -> List[Dict[str, str]]: | |
""" | |
DuckDuckGoで検索し、上位N件の検索結果のURLからクリーンなHTMLコンテンツを取得します。 | |
Args: | |
search_query (str): 検索クエリ。 | |
num_search_results (int): 取得する検索結果の数。 | |
Returns: | |
List[Dict[str, str]]: 処理されたドキュメントのリスト。 | |
各ドキュメントは 'title', 'original_url', 'cleaned_html_content' を含む。 | |
""" | |
processed_documents = [] | |
# Playwrightの非同期コンテキストマネージャでブラウザインスタンスを管理 | |
async with async_playwright() as p: | |
# ブラウザを一度だけ起動し、インスタンス変数に保持 | |
self._browser = await p.chromium.launch(headless=self.headless) | |
top_results = await self.search_duckduckgo(search_query, num_search_results) | |
if top_results: | |
for i, result in enumerate(top_results): | |
print(f"\n--- 処理中の記事 {i+1} ---") | |
print(f"タイトル: {result['title']}") | |
print(f"元URL: {result['url']}") | |
# 個別のURLのコンテンツを取得・クリーンアップ | |
raw_html = await self._get_raw_html_content(result['url']) | |
if raw_html: | |
cleaned_content = self._clean_html_to_text(raw_html) | |
processed_documents.append({ | |
"title": result['title'], | |
"original_url": result['url'], | |
"cleaned_html_content": cleaned_content | |
}) | |
print(f" クリーンなコンテンツの長さ: {len(cleaned_content)} 文字") | |
print(f" クリーンなコンテンツ(一部):\n{cleaned_content[:500]}...") | |
else: | |
print(" クリーンなコンテンツを取得できませんでした。") | |
else: | |
print("検索結果が見つからなかったため、処理をスキップします。") | |
await self._close_browser() # 全ての処理後にブラウザを閉じる | |
return processed_documents | |
# クラスの使用例 | |