File size: 8,163 Bytes
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import asyncio
from playwright.async_api import async_playwright, Page, Browser
from bs4 import BeautifulSoup
from bs4.element import Comment # BeautifulSoupのコメント削除用
from urllib.parse import urlparse, parse_qs
from typing import List, Dict, Optional

class WebScraper:
    """
    DuckDuckGoでの検索、URLからのコンテンツ取得、HTMLクリーンアップを行うクラス。
    """
    def __init__(self, headless: bool = True, default_timeout: int = 30000):
        """
        WebScraperのインスタンスを初期化します。

        Args:
            headless (bool): Playwrightをヘッドレスモードで実行するかどうか (デフォルト: True)。
            default_timeout (int): ページのロードタイムアウト (ミリ秒、デフォルト: 30000 = 30秒)。
        """
        self.headless = headless
        self.default_timeout = default_timeout
        self._browser: Optional[Browser] = None # Browserインスタンスを保持するため

    async def _launch_browser(self) -> Browser:
        """ブラウザを起動し、インスタンス変数に格納します。"""
        if not self._browser or not self._browser.is_connected():
            self._browser = await async_playwright().chromium.launch(headless=self.headless)
        return self._browser

    async def _close_browser(self):
        """ブラウザを閉じます。"""
        if self._browser and self._browser.is_connected():
            await self._browser.close()
        self._browser = None

    async def _get_new_page(self) -> Page:
        """新しいページ(タブ)を作成します。"""
        browser = await self._launch_browser()
        page = await browser.new_page()
        page.set_default_timeout(self.default_timeout)
        return page

    async def search_duckduckgo(self, query: str, num_results: int = 3) -> List[Dict[str, str]]:
        """
        DuckDuckGoで指定されたクエリを検索し、上位N件の検索結果(タイトルとURL)を返します。
        """
        results = []
        page: Optional[Page] = None # 明示的に型ヒントを追加
        try:
            page = await self._get_new_page()
            print(f"DuckDuckGoで '{query}' を検索中...")
            await page.goto("https://duckduckgo.com/")

            await page.fill("#search_form_input_homepage", query)
            await page.press("#search_form_input_homepage", "Enter")

            await page.wait_for_selector("#links .result__a", timeout=10000)

            search_elements = await page.query_selector_all("#links .result")
            
            for i, element in enumerate(search_elements):
                if i >= num_results:
                    break
                
                title_element = await element.query_selector(".result__a")
                url_element = await element.query_selector(".result__url")

                title = await title_element.text_content() if title_element else "タイトルなし"
                url = await url_element.get_attribute("href") if url_element else "URLなし"
                
                # DuckDuckGoのURLのデコードとクリーンアップ
                if url and url != "URLなし":
                    parsed_url = urlparse(url)
                    if parsed_url.path == '/l/':
                        decoded_url = parse_qs(parsed_url.query).get('uddg', [''])[0]
                        url = decoded_url
                    
                results.append({"title": title.strip(), "url": url.strip()})
        except Exception as e:
            print(f"DuckDuckGo検索中にエラーが発生しました: {e}")
        finally:
            if page:
                await page.close() # ページを閉じる
        
        print(f"検索が完了しました。{len(results)} 件の結果が見つかりました。")
        return results

    async def _get_raw_html_content(self, url: str) -> Optional[str]:
        """指定されたURLから生のHTMLコンテンツを取得します。"""
        page: Optional[Page] = None
        try:
            page = await self._get_new_page()
            print(f"  URL: {url} のコンテンツを取得中...")
            await page.goto(url)
            return await page.content()
        except Exception as e:
            print(f"  URL: {url} のコンテンツ取得中にエラーが発生しました: {e}")
            return None
        finally:
            if page:
                await page.close()

    def _clean_html_to_text(self, html_content: str) -> str:
        """
        HTMLコンテンツからJavaScript、スタイル、不要なリンクなどを除去し、整形されたテキストを返します。
        """
        soup = BeautifulSoup(html_content, 'html.parser')

        # スクリプトタグとスタイルタグを削除
        for script_or_style in soup(["script", "style"]):
            script_or_style.decompose()

        # headタグ内のリンクタグ(CSSなど)を削除
        if soup.head:
            for link_tag in soup.head.find_all('link'):
                link_tag.decompose()
        
        # HTMLコメントを削除
        for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
            comment.extract()

        # 複数の連続する改行を1つに減らす
        cleaned_text = soup.get_text(separator='\n', strip=True)
        cleaned_text_lines = [line.strip() for line in cleaned_text.splitlines() if line.strip()]
        return '\n'.join(cleaned_text_lines)

    async def get_processed_documents(self, search_query: str, num_search_results: int = 2) -> List[Dict[str, str]]:
        """
        DuckDuckGoで検索し、上位N件の検索結果のURLからクリーンなHTMLコンテンツを取得します。

        Args:
            search_query (str): 検索クエリ。
            num_search_results (int): 取得する検索結果の数。

        Returns:
            List[Dict[str, str]]: 処理されたドキュメントのリスト。
                                  各ドキュメントは 'title', 'original_url', 'cleaned_html_content' を含む。
        """
        processed_documents = []
        
        # Playwrightの非同期コンテキストマネージャでブラウザインスタンスを管理
        async with async_playwright() as p:
            # ブラウザを一度だけ起動し、インスタンス変数に保持
            self._browser = await p.chromium.launch(headless=self.headless)

            top_results = await self.search_duckduckgo(search_query, num_search_results)

            if top_results:
                for i, result in enumerate(top_results):
                    print(f"\n--- 処理中の記事 {i+1} ---")
                    print(f"タイトル: {result['title']}")
                    print(f"元URL: {result['url']}")
                    
                    # 個別のURLのコンテンツを取得・クリーンアップ
                    raw_html = await self._get_raw_html_content(result['url'])
                    
                    if raw_html:
                        cleaned_content = self._clean_html_to_text(raw_html)
                        processed_documents.append({
                            "title": result['title'],
                            "original_url": result['url'],
                            "cleaned_html_content": cleaned_content
                        })
                        print(f"  クリーンなコンテンツの長さ: {len(cleaned_content)} 文字")
                        print(f"  クリーンなコンテンツ(一部):\n{cleaned_content[:500]}...")
                    else:
                        print("  クリーンなコンテンツを取得できませんでした。")
            else:
                print("検索結果が見つからなかったため、処理をスキップします。")
            
            await self._close_browser() # 全ての処理後にブラウザを閉じる

        return processed_documents

# クラスの使用例