Spaces:
Runtime error
Runtime error
File size: 8,163 Bytes
b42a7a4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
import asyncio
from playwright.async_api import async_playwright, Page, Browser
from bs4 import BeautifulSoup
from bs4.element import Comment # BeautifulSoupのコメント削除用
from urllib.parse import urlparse, parse_qs
from typing import List, Dict, Optional
class WebScraper:
"""
DuckDuckGoでの検索、URLからのコンテンツ取得、HTMLクリーンアップを行うクラス。
"""
def __init__(self, headless: bool = True, default_timeout: int = 30000):
"""
WebScraperのインスタンスを初期化します。
Args:
headless (bool): Playwrightをヘッドレスモードで実行するかどうか (デフォルト: True)。
default_timeout (int): ページのロードタイムアウト (ミリ秒、デフォルト: 30000 = 30秒)。
"""
self.headless = headless
self.default_timeout = default_timeout
self._browser: Optional[Browser] = None # Browserインスタンスを保持するため
async def _launch_browser(self) -> Browser:
"""ブラウザを起動し、インスタンス変数に格納します。"""
if not self._browser or not self._browser.is_connected():
self._browser = await async_playwright().chromium.launch(headless=self.headless)
return self._browser
async def _close_browser(self):
"""ブラウザを閉じます。"""
if self._browser and self._browser.is_connected():
await self._browser.close()
self._browser = None
async def _get_new_page(self) -> Page:
"""新しいページ(タブ)を作成します。"""
browser = await self._launch_browser()
page = await browser.new_page()
page.set_default_timeout(self.default_timeout)
return page
async def search_duckduckgo(self, query: str, num_results: int = 3) -> List[Dict[str, str]]:
"""
DuckDuckGoで指定されたクエリを検索し、上位N件の検索結果(タイトルとURL)を返します。
"""
results = []
page: Optional[Page] = None # 明示的に型ヒントを追加
try:
page = await self._get_new_page()
print(f"DuckDuckGoで '{query}' を検索中...")
await page.goto("https://duckduckgo.com/")
await page.fill("#search_form_input_homepage", query)
await page.press("#search_form_input_homepage", "Enter")
await page.wait_for_selector("#links .result__a", timeout=10000)
search_elements = await page.query_selector_all("#links .result")
for i, element in enumerate(search_elements):
if i >= num_results:
break
title_element = await element.query_selector(".result__a")
url_element = await element.query_selector(".result__url")
title = await title_element.text_content() if title_element else "タイトルなし"
url = await url_element.get_attribute("href") if url_element else "URLなし"
# DuckDuckGoのURLのデコードとクリーンアップ
if url and url != "URLなし":
parsed_url = urlparse(url)
if parsed_url.path == '/l/':
decoded_url = parse_qs(parsed_url.query).get('uddg', [''])[0]
url = decoded_url
results.append({"title": title.strip(), "url": url.strip()})
except Exception as e:
print(f"DuckDuckGo検索中にエラーが発生しました: {e}")
finally:
if page:
await page.close() # ページを閉じる
print(f"検索が完了しました。{len(results)} 件の結果が見つかりました。")
return results
async def _get_raw_html_content(self, url: str) -> Optional[str]:
"""指定されたURLから生のHTMLコンテンツを取得します。"""
page: Optional[Page] = None
try:
page = await self._get_new_page()
print(f" URL: {url} のコンテンツを取得中...")
await page.goto(url)
return await page.content()
except Exception as e:
print(f" URL: {url} のコンテンツ取得中にエラーが発生しました: {e}")
return None
finally:
if page:
await page.close()
def _clean_html_to_text(self, html_content: str) -> str:
"""
HTMLコンテンツからJavaScript、スタイル、不要なリンクなどを除去し、整形されたテキストを返します。
"""
soup = BeautifulSoup(html_content, 'html.parser')
# スクリプトタグとスタイルタグを削除
for script_or_style in soup(["script", "style"]):
script_or_style.decompose()
# headタグ内のリンクタグ(CSSなど)を削除
if soup.head:
for link_tag in soup.head.find_all('link'):
link_tag.decompose()
# HTMLコメントを削除
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
# 複数の連続する改行を1つに減らす
cleaned_text = soup.get_text(separator='\n', strip=True)
cleaned_text_lines = [line.strip() for line in cleaned_text.splitlines() if line.strip()]
return '\n'.join(cleaned_text_lines)
async def get_processed_documents(self, search_query: str, num_search_results: int = 2) -> List[Dict[str, str]]:
"""
DuckDuckGoで検索し、上位N件の検索結果のURLからクリーンなHTMLコンテンツを取得します。
Args:
search_query (str): 検索クエリ。
num_search_results (int): 取得する検索結果の数。
Returns:
List[Dict[str, str]]: 処理されたドキュメントのリスト。
各ドキュメントは 'title', 'original_url', 'cleaned_html_content' を含む。
"""
processed_documents = []
# Playwrightの非同期コンテキストマネージャでブラウザインスタンスを管理
async with async_playwright() as p:
# ブラウザを一度だけ起動し、インスタンス変数に保持
self._browser = await p.chromium.launch(headless=self.headless)
top_results = await self.search_duckduckgo(search_query, num_search_results)
if top_results:
for i, result in enumerate(top_results):
print(f"\n--- 処理中の記事 {i+1} ---")
print(f"タイトル: {result['title']}")
print(f"元URL: {result['url']}")
# 個別のURLのコンテンツを取得・クリーンアップ
raw_html = await self._get_raw_html_content(result['url'])
if raw_html:
cleaned_content = self._clean_html_to_text(raw_html)
processed_documents.append({
"title": result['title'],
"original_url": result['url'],
"cleaned_html_content": cleaned_content
})
print(f" クリーンなコンテンツの長さ: {len(cleaned_content)} 文字")
print(f" クリーンなコンテンツ(一部):\n{cleaned_content[:500]}...")
else:
print(" クリーンなコンテンツを取得できませんでした。")
else:
print("検索結果が見つからなかったため、処理をスキップします。")
await self._close_browser() # 全ての処理後にブラウザを閉じる
return processed_documents
# クラスの使用例
|