AIdentify / search.py
syurein
search機能の実装
68c46ce
raw
history blame
10.7 kB
import asyncio
from playwright.async_api import async_playwright, Page, Browser, Playwright
from bs4 import BeautifulSoup
from bs4.element import Comment
from urllib.parse import urlparse, parse_qs
from typing import List, Dict, Optional
class WebScraper:
"""
DuckDuckGoでの検索、URLからのコンテンツ取得、HTMLクリーンアップを行うクラス。
"""
def __init__(self, headless: bool = True, default_timeout: int = 30000):
"""
WebScraperのインスタンスを初期化します。
Args:
headless (bool): Playwrightをヘッドレスモードで実行するかどうか (デフォルト: True)。
default_timeout (int): ページのロードタイムアウト (ミリ秒、デフォルト: 30000 = 30秒)。
"""
self.headless = headless
self.default_timeout = default_timeout
self._browser: Optional[Browser] = None
self._playwright_instance: Optional[Playwright] = None # Playwrightインスタンスを保持
async def _launch_browser(self) -> Browser:
"""Playwrightを起動し、ブラウザを立ち上げます。
既にブラウザが起動していればそれを再利用します。
"""
if not self._browser or not self._browser.is_connected():
if self._playwright_instance is None:
self._playwright_instance = await async_playwright().start()
self._browser = await self._playwright_instance.chromium.launch(headless=self.headless)
return self._browser
async def _close_browser(self):
"""ブラウザを閉じ、Playwrightインスタンスも停止します。"""
if self._browser and self._browser.is_connected():
await self._browser.close()
self._browser = None
if self._playwright_instance:
await self._playwright_instance.stop()
self._playwright_instance = None
async def _get_new_page(self) -> Page:
"""新しいページ(タブ)を作成します。"""
browser = await self._launch_browser() # ブラウザが起動または取得される
page = await browser.new_page()
page.set_default_timeout(self.default_timeout)
return page
async def search_duckduckgo(self, query: str, num_results: int = 3) -> List[Dict[str, str]]:
"""
DuckDuckGoで指定されたクエリを検索し、上位N件の検索結果(タイトルとURL)を返します。
"""
results = []
page: Optional[Page] = None
try:
page = await self._get_new_page()
"""Playwrightのステルス技術を適用し、ボット検出を回避します。"""
await page.evaluate("""Object.defineProperty(navigator, 'webdriver', { get: () => false });""")
await page.evaluate("""Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });""")
await page.evaluate("""Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });""")
await page.evaluate("""window.chrome = { runtime: {}, loadTimes: function() {}, csi: function() {}, app: {} };""")
await page.evaluate("""Object.defineProperty(navigator.permissions, 'query', { enumerable: true, configurable: true, writable: true, value: async (parameters) => ({ state: 'prompt' }) });""")
print(f"DuckDuckGoで '{query}' を検索中...")
# DuckDuckGoの検索URLは一般的に `?q=` パラメータを使用します
await page.goto(f"https://duckduckgo.com/?q={query}")
# 検索結果のタイトルリンク要素を特定するセレクタ
# DuckDuckGoのHTML構造は変更される可能性があるため、適宜調整が必要
# 現在の一般的なセレクタは 'a[data-testid="result-title-link"]'
await page.wait_for_selector('h2 > a', timeout=10000)
# 検索結果のタイトルリンク要素を取得 (await は不要、Locatorオブジェクトを返す)
search_links = page.locator('h2 > a')
# 取得する結果の数を制限
for i in range(min(num_results, await search_links.count())):
link_element = search_links.nth(i)
# タイトルはリンク要素のテキストコンテンツ
title = await link_element.text_content()
# URLはリンク要素のhref属性
url = await link_element.get_attribute("href")
# DuckDuckGoのリダイレクトURLのデコードとクリーンアップ
if url:
parsed_url = urlparse(url)
# DuckDuckGoのリダイレクトURLかどうかをチェック
if parsed_url.netloc == 'duckduckgo.com' and parsed_url.path == '/l/':
decoded_url = parse_qs(parsed_url.query).get('uddg', [''])[0]
url = decoded_url
# 結果を追加する前に、タイトルとURLが有効か軽くチェック
if title and url and title.strip() != "" and url.strip() != "":
results.append({"title": title.strip(), "url": url.strip()})
except Exception as e:
print(f"DuckDuckGo検索中にエラーが発生しました: {e}")
finally:
if page:
await page.close() # ページを閉じる
print(f"検索が完了しました。{len(results)} 件の結果が見つかりました。")
return results
async def _get_raw_html_content(self, url: str) -> Optional[str]:
"""指定されたURLから生のHTMLコンテンツを取得します。"""
page: Optional[Page] = None
try:
page = await self._get_new_page()
print(f" URL: {url} のコンテンツを取得中...")
# 'domcontentloaded' は 'load' よりも高速な場合が多い
await page.goto(url, wait_until='domcontentloaded')
return await page.content()
except Exception as e:
print(f" URL: {url} のコンテンツ取得中にエラーが発生しました: {e}")
return None
finally:
if page:
await page.close()
def _clean_html_to_text(self, html_content: str) -> str:
"""
HTMLコンテンツからJavaScript、スタイル、不要なリンクなどを除去し、整形されたテキストを返します。
"""
soup = BeautifulSoup(html_content, 'html.parser')
# スクリプトタグとスタイルタグを削除
for script_or_style in soup(["script", "style"]):
script_or_style.decompose()
# headタグ内のリンクタグ(CSSなど)を削除
if soup.head:
for link_tag in soup.head.find_all('link'):
link_tag.decompose()
# HTMLコメントを削除
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
# 複数の連続する改行を1つに減らす
cleaned_text = soup.get_text(separator='\n', strip=True)
# 空行を削除し、各行をトリム
cleaned_text_lines = [line.strip() for line in cleaned_text.splitlines() if line.strip()]
return '\n'.join(cleaned_text_lines)
async def get_processed_documents(self, search_query: str, num_search_results: int = 2) -> List[Dict[str, str]]:
"""
DuckDuckGoで検索し、上位N件の検索結果のURLからクリーンなHTMLコンテンツを取得します。
Args:
search_query (str): 検索クエリ。
num_search_results (int): 取得する検索結果の数。
Returns:
List[Dict[str, str]]: 処理されたドキュメントのリスト。
各ドキュメントは 'title', 'original_url', 'cleaned_html_content' を含む。
"""
processed_documents = []
# クラスのインスタンスでブラウザのライフサイクルを管理
try:
top_results = await self.search_duckduckgo(search_query, num_search_results)
if top_results:
for i, result in enumerate(top_results):
print(f"\n--- 処理中の記事 {i+1} ---")
print(f"タイトル: {result['title']}")
print(f"元URL: {result['url']}")
# 個別のURLのコンテンツを取得・クリーンアップ
raw_html = await self._get_raw_html_content(result['url'])
if raw_html:
cleaned_content = self._clean_html_to_text(raw_html)
processed_documents.append({
"title": result['title'],
"original_url": result['url'],
"cleaned_html_content": cleaned_content
})
print(f" クリーンなコンテンツの長さ: {len(cleaned_content)} 文字")
print(f" クリーンなコンテンツ(一部):\n{cleaned_content[:500]}...")
else:
print(" クリーンなコンテンツを取得できませんでした。")
else:
print("検索結果が見つからなかったため、処理をスキップします。")
finally:
# すべての処理が完了したらブラウザを閉じる
await self._close_browser()
return processed_documents
# クラスの使用例
async def main():
scraper = WebScraper(headless=False) # デバッグのためにheadless=Falseにしても良い
query = "個人情報流出 事例"
documents = await scraper.get_processed_documents(query, num_search_results=2)
if documents:
print("\n--- 処理されたドキュメント ---")
for doc in documents:
print(f"タイトル: {doc['title']}")
print(f"URL: {doc['original_url']}")
# print(f"コンテンツの長さ: {len(doc['cleaned_html_content'])} 文字")
# print(f"コンテンツの一部: {doc['cleaned_html_content'][:200]}...\n")
else:
print("処理されたドキュメントはありませんでした。")
if __name__ == "__main__":
asyncio.run(main())