File size: 7,906 Bytes
dbaa71b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Code in this file is copied from https://github.com/egbertbouman/youtube-comment-downloader/blob/master/youtube_comment_downloader/downloader.py
# and modified to fit the needs of this project. When code from youtube-comment-downloader was copied it was MIT licensed.
# Code Commit: https://github.com/egbertbouman/youtube-comment-downloader/commit/9a15b8e3fbaebad660875409fb1bbe74db17f304

import json
import logging
import time
import re
from datetime import datetime, timezone

import dateparser
from typing import Optional, Any, List, Dict, Generator

import requests
from pydantic import BaseModel
from requests import Session

logger = logging.getLogger(__name__)


class YouTubeCommentExtractor(BaseModel):
    _YT_URL: str = 'https://www.youtube.com'
    _YT_CFG_REGEX: str = r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;'
    _YT_INITIAL_DATA_REGEX: str = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+?})\s*;\s*(?:var\s+meta|</script|\n)'
    video_url: str
    user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36'
    sort_by: int = 1  # 0 = sort by popular, 1 = sort by recent
    max_comments: Optional[int] = 20
    fetch_replies: bool = False
    lang_code: Optional[str] = None
    sleep_time: float = 0.1
    request_retries: int = 5

    def __init__(self, **kwargs: Any):
        super().__init__(**kwargs)

        if self.sort_by not in [0, 1]:
            raise ValueError('sort_by must be either 0 or 1')

    @staticmethod
    def _regex_search(text: str, pattern: str, group: int = 1) -> str:
        match = re.search(pattern, text)
        return match.group(group) if match else ''

    def _ajax_request(self, session: Session, endpoint: Dict[str, Any], ytcfg: Dict[str, Any]) -> Any:
        url = self._YT_URL + endpoint['commandMetadata']['webCommandMetadata']['apiUrl']

        data = {'context': ytcfg['INNERTUBE_CONTEXT'],
                'continuation': endpoint['continuationCommand']['token']}

        for _ in range(self.request_retries):
            response = session.post(url, params={'key': ytcfg['INNERTUBE_API_KEY']}, json=data)
            if response.status_code == 200:
                return response.json()
            if response.status_code in [403, 413]:
                return {}
            else:
                time.sleep(self.sleep_time)

    @staticmethod
    def _search_dict(partial: Any, search_key: str) -> Generator[Any, Any, None]:
        stack = [partial]
        while stack:
            current_item = stack.pop()
            if isinstance(current_item, dict):
                for key, value in current_item.items():
                    if key == search_key:
                        yield value
                    else:
                        stack.append(value)
            elif isinstance(current_item, list):
                for value in current_item:
                    stack.append(value)

    def _fetch_comments(self, until_datetime: Optional[datetime] = None) -> Generator[Any, Any, None]:
        session = requests.Session()
        session.headers['User-Agent'] = self.user_agent
        response = session.get(self.video_url)

        if response.request and response.request.url and 'uxe=' in response.request.url:
            session.cookies.set('CONSENT', 'YES+cb', domain='.youtube.com')  # type: ignore[no-untyped-call]
            response = session.get(self.video_url)

        html = response.text
        ytcfg = json.loads(self._regex_search(html, self._YT_CFG_REGEX))
        if not ytcfg:
            return  # Unable to extract configuration
        if self.lang_code:
            ytcfg['INNERTUBE_CONTEXT']['client']['hl'] = self.lang_code

        data = json.loads(self._regex_search(html, self._YT_INITIAL_DATA_REGEX))

        section = next(self._search_dict(data, 'itemSectionRenderer'), None)
        renderer = next(self._search_dict(section, 'continuationItemRenderer'), None) if section else None
        if not renderer:
            # Comments disabled?
            return

        needs_sorting = self.sort_by != 0
        continuations = [renderer['continuationEndpoint']]
        while continuations:
            continuation = continuations.pop()
            response = self._ajax_request(session, continuation, ytcfg)

            if not response:
                break
            if list(self._search_dict(response, 'externalErrorMessage')):
                logger.warning('Error returned from server: %s', next(self._search_dict(response, 'externalErrorMessage')))
                return

            if needs_sorting:
                sub_menu: Dict[str, Any] = next(self._search_dict(response, 'sortFilterSubMenuRenderer'), {})
                sort_menu = sub_menu.get('subMenuItems', [])
                if self.sort_by < len(sort_menu):
                    continuations = [sort_menu[self.sort_by]['serviceEndpoint']]
                    needs_sorting = False
                    continue
                # TODO: Fix it. Causing observer to fail silently\
                logger.warning("Unable to set sorting")
                # raise RuntimeError('Failed to set sorting')

            actions = list(self._search_dict(response, 'reloadContinuationItemsCommand')) + \
                      list(self._search_dict(response, 'appendContinuationItemsAction'))

            for action in actions:
                for item in action.get('continuationItems', []):
                    if action['targetId'] == 'comments-section':
                        # Process continuations for comments and replies.
                        continuations[:0] = [ep for ep in self._search_dict(item, 'continuationEndpoint')]
                    if self.fetch_replies:
                        # TODO: Fix it. This functionality is broken
                        if action['targetId'].startswith('comment-replies-item') and 'continuationItemRenderer' in item:
                            # Process the 'Show more replies' button
                            continuations.append(next(self._search_dict(item, 'buttonRenderer'))['command'])

            for comment in reversed(list(self._search_dict(response, 'commentRenderer'))):
                if not self.fetch_replies and "." in comment['commentId']:
                    continue

                comment_time_string = comment['publishedTimeText']['runs'][0]['text']
                comment_time_string = comment_time_string or ''
                comment_time = dateparser.parse(
                    comment_time_string.split('(edited)', 1)[0].strip(),
                )

                if comment_time:
                    comment_time = comment_time.replace(tzinfo=timezone.utc)
                    if until_datetime and until_datetime > comment_time:
                        return

                yield {'comment_id': comment['commentId'],
                       'text': ''.join([c['text'] for c in comment['contentText'].get('runs', [])]),
                       'time': comment_time,
                       'author': comment.get('authorText', {}).get('simpleText', ''),
                       'channel': comment['authorEndpoint']['browseEndpoint'].get('browseId', ''),
                       'votes': comment.get('voteCount', {}).get('simpleText', '0'),
                       'photo': comment['authorThumbnail']['thumbnails'][-1]['url'],
                       'heart': next(self._search_dict(comment, 'isHearted'), False)}

            time.sleep(self.sleep_time)

    def fetch_comments(self, until_datetime: Optional[datetime] = None) -> List[Dict[str, Any]]:
        comments: List[Dict[str, Any]] = []
        for comment in self._fetch_comments(until_datetime=until_datetime):
            comments.append(comment)
            if self.max_comments and self.max_comments == len(comments):
                break

        return comments