File size: 13,585 Bytes
b42a7a4
68c46ce
b42a7a4
68c46ce
b42a7a4
 
31d2be8
b42a7a4
 
 
 
 
31d2be8
 
 
 
 
 
 
 
 
 
 
 
 
 
b42a7a4
 
 
 
 
 
 
 
 
 
68c46ce
 
b42a7a4
 
68c46ce
 
 
b42a7a4
68c46ce
 
31d2be8
 
 
 
 
 
 
 
 
 
 
b42a7a4
 
 
68c46ce
b42a7a4
 
 
68c46ce
 
 
b42a7a4
 
 
68c46ce
b42a7a4
 
31d2be8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b42a7a4
 
 
 
 
 
 
68c46ce
 
b42a7a4
 
31d2be8
 
 
 
 
 
 
 
68c46ce
31d2be8
 
 
 
 
 
 
 
 
 
 
b42a7a4
68c46ce
b42a7a4
68c46ce
 
b42a7a4
68c46ce
 
b42a7a4
68c46ce
b42a7a4
31d2be8
68c46ce
b42a7a4
 
 
68c46ce
 
 
31d2be8
 
 
 
 
68c46ce
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
31d2be8
 
 
b42a7a4
 
31d2be8
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c46ce
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
31d2be8
b42a7a4
 
 
68c46ce
 
b42a7a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31d2be8
 
b42a7a4
31d2be8
b42a7a4
 
68c46ce
 
 
b42a7a4
 
 
 
68c46ce
31d2be8
68c46ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import asyncio
from playwright.async_api import async_playwright, Page, Browser, Playwright
from bs4 import BeautifulSoup
from bs4.element import Comment
from urllib.parse import urlparse, parse_qs
from typing import List, Dict, Optional
import random # randomモジュールを追加

class WebScraper:
    """
    DuckDuckGoでの検索、URLからのコンテンツ取得、HTMLクリーンアップを行うクラス。
    """
    # User-Agentのリストをクラス変数として定義
    USER_AGENTS = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/108.0",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/108.0",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # 最新版に近いChrome
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15", # Safari
        "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 OPR/106.0.0.0", # Opera
    ]

    def __init__(self, headless: bool = True, default_timeout: int = 30000):
        """
        WebScraperのインスタンスを初期化します。

        Args:
            headless (bool): Playwrightをヘッドレスモードで実行するかどうか (デフォルト: True)。
            default_timeout (int): ページのロードタイムアウト (ミリ秒、デフォルト: 30000 = 30秒)。
        """
        self.headless = headless
        self.default_timeout = default_timeout
        self._browser: Optional[Browser] = None
        self._playwright_instance: Optional[Playwright] = None # Playwrightインスタンスを保持

    async def _launch_browser(self) -> Browser:
        """Playwrightを起動し、ブラウザを立ち上げます。
        既にブラウザが起動していればそれを再利用します。
        """
        if not self._browser or not self._browser.is_connected():
            if self._playwright_instance is None:
                self._playwright_instance = await async_playwright().start()
            # ヘッドレスモードでの検出を避けるための引数を追加
            self._browser = await self._playwright_instance.chromium.launch(
                headless=self.headless,
                args=[
                    '--no-sandbox',
                    '--disable-setuid-sandbox',
                    '--disable-infobars',
                    '--window-size=1280,720', # 一般的なデスクトップサイズに設定
                    '--disable-blink-features=AutomationControlled' # ヘッドレス検出を回避
                ]
            )
        return self._browser

    async def _close_browser(self):
        """ブラウザを閉じ、Playwrightインスタンスも停止します。"""
        if self._browser and self._browser.is_connected():
            await self._browser.close()
        self._browser = None
        if self._playwright_instance:
            await self._playwright_instance.stop()
            self._playwright_instance = None

    async def _get_new_page(self) -> Page:
        """新しいページ(タブ)を作成します。"""
        browser = await self._launch_browser() # ブラウザが起動または取得される
        page = await browser.new_page()
        page.set_default_timeout(self.default_timeout)
        
        # User-Agentをランダムに選択して設定
        await page.set_extra_http_headers({
            "User-Agent": random.choice(self.USER_AGENTS)
        })

        # より包括的なステルス対策をページに適用
        await page.evaluate("""Object.defineProperty(navigator, 'webdriver', { get: () => false });""")
        await page.evaluate("""Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });""")
        await page.evaluate("""Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });""")
        await page.evaluate("""window.chrome = { runtime: {}, loadTimes: function() {}, csi: function() {}, app: {} };""")
        await page.evaluate("""Object.defineProperty(navigator.permissions, 'query', { enumerable: true, configurable: true, writable: true, value: async (parameters) => ({ state: 'prompt' }) });""")
        
        # ページロード後の追加待機
        await asyncio.sleep(random.uniform(2, 5)) 

        return page

    async def search_duckduckgo(self, query: str, num_results: int = 3) -> List[Dict[str, str]]:
        """
        DuckDuckGoで指定されたクエリを検索し、上位N件の検索結果(タイトルとURL)を返します。
        """
        results = []
        page: Optional[Page] = None
        
        try:
            page = await self._get_new_page()
            
            print(f"Bingで '{query}' を検索中...")
            # networkidle でより安定したページロードを待機
            await page.goto(f"https://www.bing.com/search?q=={query}&setlang=ja", wait_until='networkidle') 
            
            # デバッグのためにページのスクリーンショットを保存
            await page.screenshot(path="./duckduckgo_search_results.png")
            
            # 検索結果のタイトルリンク要素を特定するセレクタ
            # DuckDuckGoのHTML構造は変更される可能性があるため、適宜調整が必要です。
            # 現在の一般的なセレクタは 'a[data-testid="result-title-link"]' もしくは 'h2 > a' ですが、
            # ページ構造が変わった場合は、開発者ツールで適切なセレクタを見つけてください。
            
            # 要素が見つかるまで、より長く待機するか、別のセレクタを試す
            try:
                await page.wait_for_selector('h2 > a', timeout=20000) # タイムアウトを長くする
            except Exception as e:
                print(f"セレクタ 'h2 > a' の待機中にタイムアウトしました: {e}")
                # ここで代替のセレクタを試すか、処理を終了する
                return []

            search_links = page.locator('h2 > a')

            for i in range(min(num_results, await search_links.count())):
                link_element = search_links.nth(i)
                
                title = await link_element.text_content()
                url = await link_element.get_attribute("href")
                
                if url:
                    parsed_url = urlparse(url)
                    # DuckDuckGoのリダイレクトURLのデコードとクリーンアップ
                    if parsed_url.netloc == 'duckduckgo.com' and parsed_url.path == '/l/':
                        decoded_url = parse_qs(parsed_url.query).get('uddg', [''])[0]
                        url = decoded_url
                    
                    # 結果を追加する前に、タイトルとURLが有効か軽くチェック
                    if title and url and title.strip() != "" and url.strip() != "":
                        results.append({"title": title.strip(), "url": url.strip()})
            
            # 検索結果が一つも見つからなかった場合もスクリーンショットを保存
            if not results:
                print(f"検索結果が見つかりませんでした。ページのスクリーンショットを './duckduckgo_no_results.png' に保存します。")
                await page.screenshot(path="./duckduckgo_no_results.png")

        except Exception as e:
            print(f"DuckDuckGo検索中にエラーが発生しました: {e}")
        finally:
            if page:
                await page.close() # ページを閉じる
        
        print(f"検索が完了しました。{len(results)} 件の結果が見つかりました。")
        return results

    async def _get_raw_html_content(self, url: str) -> Optional[str]:
        """指定されたURLから生のHTMLコンテンツを取得します。"""
        page: Optional[Page] = None
        try:
            page = await self._get_new_page()
            print(f"   URL: {url} のコンテンツを取得中...")
            # networkidle でより安定したページロードを待機
            await page.goto(url) 
            return await page.content()
        except Exception as e:
            print(f"   URL: {url} のコンテンツ取得中にエラーが発生しました: {e}")
            return None
        finally:
            if page:
                await page.close()

    def _clean_html_to_text(self, html_content: str) -> str:
        """
        HTMLコンテンツからJavaScript、スタイル、不要なリンクなどを除去し、整形されたテキストを返します。
        """
        soup = BeautifulSoup(html_content, 'html.parser')

        # スクリプトタグとスタイルタグを削除
        for script_or_style in soup(["script", "style"]):
            script_or_style.decompose()

        # headタグ内のリンクタグ(CSSなど)を削除
        if soup.head:
            for link_tag in soup.head.find_all('link'):
                link_tag.decompose()
        
        # HTMLコメントを削除
        for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
            comment.extract()

        # 複数の連続する改行を1つに減らす
        cleaned_text = soup.get_text(separator='\n', strip=True)
        # 空行を削除し、各行をトリム
        cleaned_text_lines = [line.strip() for line in cleaned_text.splitlines() if line.strip()]
        return '\n'.join(cleaned_text_lines)

    async def get_processed_documents(self, search_query: str, num_search_results: int = 2) -> List[Dict[str, str]]:
        """
        DuckDuckGoで検索し、上位N件の検索結果のURLからクリーンなHTMLコンテンツを取得します。

        Args:
            search_query (str): 検索クエリ。
            num_search_results (int): 取得する検索結果の数。

        Returns:
            List[Dict[str, str]]: 処理されたドキュメントのリスト。
                                    各ドキュメントは 'title', 'original_url', 'cleaned_html_content' を含む。
        """
        processed_documents = []
        
        # クラスのインスタンスでブラウザのライフサイクルを管理
        try:
            top_results = await self.search_duckduckgo(search_query, num_search_results)

            if top_results:
                for i, result in enumerate(top_results):
                    print(f"\n--- 処理中の記事 {i+1} ---")
                    print(f"タイトル: {result['title']}")
                    print(f"元URL: {result['url']}")
                    
                    # 個別のURLのコンテンツを取得・クリーンアップ
                    raw_html = await self._get_raw_html_content(result['url'])
                    
                    if raw_html:
                        cleaned_content = self._clean_html_to_text(raw_html)
                        processed_documents.append({
                            "title": result['title'],
                            "original_url": result['url'],
                            "cleaned_html_content": cleaned_content
                        })
                        print(f"   クリーンなコンテンツの長さ: {len(cleaned_content)} 文字")
                        print(f"   クリーンなコンテンツ(一部):\n{cleaned_content[:500]}...")
                    else:
                        print("   クリーンなコンテンツを取得できませんでした。")
            else:
                print("検索結果が見つからなかったため、処理をスキップします。")
        finally:
            # すべての処理が完了したらブラウザを閉じる
            await self._close_browser()

        return processed_documents

# クラスの使用例
async def main():
    scraper = WebScraper(headless=False) # まずはheadless=Trueで試してください
    query = "個人情報流出 事例"
    documents = await scraper.get_processed_documents(query, num_search_results=2)

    if documents:
        print("\n--- 処理されたドキュメント ---")
        for doc in documents:
            print(f"タイトル: {doc['title']}")
            print(f"URL: {doc['original_url']}")
            # print(f"コンテンツの長さ: {len(doc['cleaned_html_content'])} 文字")
            # print(f"コンテンツの一部: {doc['cleaned_html_content'][:200]}...\n")
    else:
        print("処理されたドキュメントはありませんでした。")

if __name__ == "__main__":
    asyncio.run(main())