File size: 10,714 Bytes
b42a7a4
68c46ce
b42a7a4
68c46ce
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c46ce
 
b42a7a4
 
68c46ce
 
 
b42a7a4
68c46ce
 
 
b42a7a4
 
 
68c46ce
b42a7a4
 
 
68c46ce
 
 
b42a7a4
 
 
68c46ce
b42a7a4
 
 
 
 
 
 
 
 
68c46ce
 
b42a7a4
 
68c46ce
 
 
 
 
 
 
b42a7a4
68c46ce
 
b42a7a4
68c46ce
 
 
 
b42a7a4
68c46ce
 
b42a7a4
68c46ce
 
 
b42a7a4
68c46ce
 
 
 
b42a7a4
68c46ce
 
b42a7a4
68c46ce
 
b42a7a4
 
 
68c46ce
 
 
 
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c46ce
 
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c46ce
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c46ce
 
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c46ce
 
 
b42a7a4
 
 
 
68c46ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import asyncio
from playwright.async_api import async_playwright, Page, Browser, Playwright
from bs4 import BeautifulSoup
from bs4.element import Comment
from urllib.parse import urlparse, parse_qs
from typing import List, Dict, Optional

class WebScraper:
    """
    DuckDuckGoでの検索、URLからのコンテンツ取得、HTMLクリーンアップを行うクラス。
    """
    def __init__(self, headless: bool = True, default_timeout: int = 30000):
        """
        WebScraperのインスタンスを初期化します。

        Args:
            headless (bool): Playwrightをヘッドレスモードで実行するかどうか (デフォルト: True)。
            default_timeout (int): ページのロードタイムアウト (ミリ秒、デフォルト: 30000 = 30秒)。
        """
        self.headless = headless
        self.default_timeout = default_timeout
        self._browser: Optional[Browser] = None
        self._playwright_instance: Optional[Playwright] = None # Playwrightインスタンスを保持

    async def _launch_browser(self) -> Browser:
        """Playwrightを起動し、ブラウザを立ち上げます。
        既にブラウザが起動していればそれを再利用します。
        """
        if not self._browser or not self._browser.is_connected():
            if self._playwright_instance is None:
                self._playwright_instance = await async_playwright().start()
            self._browser = await self._playwright_instance.chromium.launch(headless=self.headless)
        return self._browser

    async def _close_browser(self):
        """ブラウザを閉じ、Playwrightインスタンスも停止します。"""
        if self._browser and self._browser.is_connected():
            await self._browser.close()
        self._browser = None
        if self._playwright_instance:
            await self._playwright_instance.stop()
            self._playwright_instance = None

    async def _get_new_page(self) -> Page:
        """新しいページ(タブ)を作成します。"""
        browser = await self._launch_browser() # ブラウザが起動または取得される
        page = await browser.new_page()
        page.set_default_timeout(self.default_timeout)
        return page

    async def search_duckduckgo(self, query: str, num_results: int = 3) -> List[Dict[str, str]]:
        """
        DuckDuckGoで指定されたクエリを検索し、上位N件の検索結果(タイトルとURL)を返します。
        """
        results = []
        page: Optional[Page] = None
        
        try:
            page = await self._get_new_page()
            """Playwrightのステルス技術を適用し、ボット検出を回避します。"""
            await page.evaluate("""Object.defineProperty(navigator, 'webdriver', { get: () => false });""")
            await page.evaluate("""Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });""")
            await page.evaluate("""Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });""")
            await page.evaluate("""window.chrome = { runtime: {}, loadTimes: function() {}, csi: function() {}, app: {} };""")
            await page.evaluate("""Object.defineProperty(navigator.permissions, 'query', { enumerable: true, configurable: true, writable: true, value: async (parameters) => ({ state: 'prompt' }) });""")

            print(f"DuckDuckGoで '{query}' を検索中...")
            # DuckDuckGoの検索URLは一般的に `?q=` パラメータを使用します
            await page.goto(f"https://duckduckgo.com/?q={query}")

            # 検索結果のタイトルリンク要素を特定するセレクタ
            # DuckDuckGoのHTML構造は変更される可能性があるため、適宜調整が必要
            # 現在の一般的なセレクタは 'a[data-testid="result-title-link"]'
            await page.wait_for_selector('h2 > a', timeout=10000)

            # 検索結果のタイトルリンク要素を取得 (await は不要、Locatorオブジェクトを返す)
            search_links = page.locator('h2 > a')

            # 取得する結果の数を制限
            for i in range(min(num_results, await search_links.count())):
                link_element = search_links.nth(i)
                
                # タイトルはリンク要素のテキストコンテンツ
                title = await link_element.text_content()
                # URLはリンク要素のhref属性
                url = await link_element.get_attribute("href")
                
                # DuckDuckGoのリダイレクトURLのデコードとクリーンアップ
                if url:
                    parsed_url = urlparse(url)
                    # DuckDuckGoのリダイレクトURLかどうかをチェック
                    if parsed_url.netloc == 'duckduckgo.com' and parsed_url.path == '/l/':
                        decoded_url = parse_qs(parsed_url.query).get('uddg', [''])[0]
                        url = decoded_url
                    
                    # 結果を追加する前に、タイトルとURLが有効か軽くチェック
                    if title and url and title.strip() != "" and url.strip() != "":
                        results.append({"title": title.strip(), "url": url.strip()})

        except Exception as e:
            print(f"DuckDuckGo検索中にエラーが発生しました: {e}")
        finally:
            if page:
                await page.close() # ページを閉じる
        
        print(f"検索が完了しました。{len(results)} 件の結果が見つかりました。")
        return results

    async def _get_raw_html_content(self, url: str) -> Optional[str]:
        """指定されたURLから生のHTMLコンテンツを取得します。"""
        page: Optional[Page] = None
        try:
            page = await self._get_new_page()
            print(f"  URL: {url} のコンテンツを取得中...")
            # 'domcontentloaded' は 'load' よりも高速な場合が多い
            await page.goto(url, wait_until='domcontentloaded') 
            return await page.content()
        except Exception as e:
            print(f"  URL: {url} のコンテンツ取得中にエラーが発生しました: {e}")
            return None
        finally:
            if page:
                await page.close()

    def _clean_html_to_text(self, html_content: str) -> str:
        """
        HTMLコンテンツからJavaScript、スタイル、不要なリンクなどを除去し、整形されたテキストを返します。
        """
        soup = BeautifulSoup(html_content, 'html.parser')

        # スクリプトタグとスタイルタグを削除
        for script_or_style in soup(["script", "style"]):
            script_or_style.decompose()

        # headタグ内のリンクタグ(CSSなど)を削除
        if soup.head:
            for link_tag in soup.head.find_all('link'):
                link_tag.decompose()
        
        # HTMLコメントを削除
        for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
            comment.extract()

        # 複数の連続する改行を1つに減らす
        cleaned_text = soup.get_text(separator='\n', strip=True)
        # 空行を削除し、各行をトリム
        cleaned_text_lines = [line.strip() for line in cleaned_text.splitlines() if line.strip()]
        return '\n'.join(cleaned_text_lines)

    async def get_processed_documents(self, search_query: str, num_search_results: int = 2) -> List[Dict[str, str]]:
        """
        DuckDuckGoで検索し、上位N件の検索結果のURLからクリーンなHTMLコンテンツを取得します。

        Args:
            search_query (str): 検索クエリ。
            num_search_results (int): 取得する検索結果の数。

        Returns:
            List[Dict[str, str]]: 処理されたドキュメントのリスト。
                                  各ドキュメントは 'title', 'original_url', 'cleaned_html_content' を含む。
        """
        processed_documents = []
        
        # クラスのインスタンスでブラウザのライフサイクルを管理
        try:
            top_results = await self.search_duckduckgo(search_query, num_search_results)

            if top_results:
                for i, result in enumerate(top_results):
                    print(f"\n--- 処理中の記事 {i+1} ---")
                    print(f"タイトル: {result['title']}")
                    print(f"元URL: {result['url']}")
                    
                    # 個別のURLのコンテンツを取得・クリーンアップ
                    raw_html = await self._get_raw_html_content(result['url'])
                    
                    if raw_html:
                        cleaned_content = self._clean_html_to_text(raw_html)
                        processed_documents.append({
                            "title": result['title'],
                            "original_url": result['url'],
                            "cleaned_html_content": cleaned_content
                        })
                        print(f"  クリーンなコンテンツの長さ: {len(cleaned_content)} 文字")
                        print(f"  クリーンなコンテンツ(一部):\n{cleaned_content[:500]}...")
                    else:
                        print("  クリーンなコンテンツを取得できませんでした。")
            else:
                print("検索結果が見つからなかったため、処理をスキップします。")
        finally:
            # すべての処理が完了したらブラウザを閉じる
            await self._close_browser()

        return processed_documents

# クラスの使用例
async def main():
    scraper = WebScraper(headless=False) # デバッグのためにheadless=Falseにしても良い
    query = "個人情報流出 事例"
    documents = await scraper.get_processed_documents(query, num_search_results=2)

    if documents:
        print("\n--- 処理されたドキュメント ---")
        for doc in documents:
            print(f"タイトル: {doc['title']}")
            print(f"URL: {doc['original_url']}")
            # print(f"コンテンツの長さ: {len(doc['cleaned_html_content'])} 文字")
            # print(f"コンテンツの一部: {doc['cleaned_html_content'][:200]}...\n")
    else:
        print("処理されたドキュメントはありませんでした。")

if __name__ == "__main__":
    asyncio.run(main())