Spaces:
Sleeping
Sleeping
File size: 7,906 Bytes
dbaa71b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# Code in this file is copied from https://github.com/egbertbouman/youtube-comment-downloader/blob/master/youtube_comment_downloader/downloader.py
# and modified to fit the needs of this project. When code from youtube-comment-downloader was copied it was MIT licensed.
# Code Commit: https://github.com/egbertbouman/youtube-comment-downloader/commit/9a15b8e3fbaebad660875409fb1bbe74db17f304
import json
import logging
import time
import re
from datetime import datetime, timezone
import dateparser
from typing import Optional, Any, List, Dict, Generator
import requests
from pydantic import BaseModel
from requests import Session
logger = logging.getLogger(__name__)
class YouTubeCommentExtractor(BaseModel):
_YT_URL: str = 'https://www.youtube.com'
_YT_CFG_REGEX: str = r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;'
_YT_INITIAL_DATA_REGEX: str = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+?})\s*;\s*(?:var\s+meta|</script|\n)'
video_url: str
user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36'
sort_by: int = 1 # 0 = sort by popular, 1 = sort by recent
max_comments: Optional[int] = 20
fetch_replies: bool = False
lang_code: Optional[str] = None
sleep_time: float = 0.1
request_retries: int = 5
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
if self.sort_by not in [0, 1]:
raise ValueError('sort_by must be either 0 or 1')
@staticmethod
def _regex_search(text: str, pattern: str, group: int = 1) -> str:
match = re.search(pattern, text)
return match.group(group) if match else ''
def _ajax_request(self, session: Session, endpoint: Dict[str, Any], ytcfg: Dict[str, Any]) -> Any:
url = self._YT_URL + endpoint['commandMetadata']['webCommandMetadata']['apiUrl']
data = {'context': ytcfg['INNERTUBE_CONTEXT'],
'continuation': endpoint['continuationCommand']['token']}
for _ in range(self.request_retries):
response = session.post(url, params={'key': ytcfg['INNERTUBE_API_KEY']}, json=data)
if response.status_code == 200:
return response.json()
if response.status_code in [403, 413]:
return {}
else:
time.sleep(self.sleep_time)
@staticmethod
def _search_dict(partial: Any, search_key: str) -> Generator[Any, Any, None]:
stack = [partial]
while stack:
current_item = stack.pop()
if isinstance(current_item, dict):
for key, value in current_item.items():
if key == search_key:
yield value
else:
stack.append(value)
elif isinstance(current_item, list):
for value in current_item:
stack.append(value)
def _fetch_comments(self, until_datetime: Optional[datetime] = None) -> Generator[Any, Any, None]:
session = requests.Session()
session.headers['User-Agent'] = self.user_agent
response = session.get(self.video_url)
if response.request and response.request.url and 'uxe=' in response.request.url:
session.cookies.set('CONSENT', 'YES+cb', domain='.youtube.com') # type: ignore[no-untyped-call]
response = session.get(self.video_url)
html = response.text
ytcfg = json.loads(self._regex_search(html, self._YT_CFG_REGEX))
if not ytcfg:
return # Unable to extract configuration
if self.lang_code:
ytcfg['INNERTUBE_CONTEXT']['client']['hl'] = self.lang_code
data = json.loads(self._regex_search(html, self._YT_INITIAL_DATA_REGEX))
section = next(self._search_dict(data, 'itemSectionRenderer'), None)
renderer = next(self._search_dict(section, 'continuationItemRenderer'), None) if section else None
if not renderer:
# Comments disabled?
return
needs_sorting = self.sort_by != 0
continuations = [renderer['continuationEndpoint']]
while continuations:
continuation = continuations.pop()
response = self._ajax_request(session, continuation, ytcfg)
if not response:
break
if list(self._search_dict(response, 'externalErrorMessage')):
logger.warning('Error returned from server: %s', next(self._search_dict(response, 'externalErrorMessage')))
return
if needs_sorting:
sub_menu: Dict[str, Any] = next(self._search_dict(response, 'sortFilterSubMenuRenderer'), {})
sort_menu = sub_menu.get('subMenuItems', [])
if self.sort_by < len(sort_menu):
continuations = [sort_menu[self.sort_by]['serviceEndpoint']]
needs_sorting = False
continue
# TODO: Fix it. Causing observer to fail silently\
logger.warning("Unable to set sorting")
# raise RuntimeError('Failed to set sorting')
actions = list(self._search_dict(response, 'reloadContinuationItemsCommand')) + \
list(self._search_dict(response, 'appendContinuationItemsAction'))
for action in actions:
for item in action.get('continuationItems', []):
if action['targetId'] == 'comments-section':
# Process continuations for comments and replies.
continuations[:0] = [ep for ep in self._search_dict(item, 'continuationEndpoint')]
if self.fetch_replies:
# TODO: Fix it. This functionality is broken
if action['targetId'].startswith('comment-replies-item') and 'continuationItemRenderer' in item:
# Process the 'Show more replies' button
continuations.append(next(self._search_dict(item, 'buttonRenderer'))['command'])
for comment in reversed(list(self._search_dict(response, 'commentRenderer'))):
if not self.fetch_replies and "." in comment['commentId']:
continue
comment_time_string = comment['publishedTimeText']['runs'][0]['text']
comment_time_string = comment_time_string or ''
comment_time = dateparser.parse(
comment_time_string.split('(edited)', 1)[0].strip(),
)
if comment_time:
comment_time = comment_time.replace(tzinfo=timezone.utc)
if until_datetime and until_datetime > comment_time:
return
yield {'comment_id': comment['commentId'],
'text': ''.join([c['text'] for c in comment['contentText'].get('runs', [])]),
'time': comment_time,
'author': comment.get('authorText', {}).get('simpleText', ''),
'channel': comment['authorEndpoint']['browseEndpoint'].get('browseId', ''),
'votes': comment.get('voteCount', {}).get('simpleText', '0'),
'photo': comment['authorThumbnail']['thumbnails'][-1]['url'],
'heart': next(self._search_dict(comment, 'isHearted'), False)}
time.sleep(self.sleep_time)
def fetch_comments(self, until_datetime: Optional[datetime] = None) -> List[Dict[str, Any]]:
comments: List[Dict[str, Any]] = []
for comment in self._fetch_comments(until_datetime=until_datetime):
comments.append(comment)
if self.max_comments and self.max_comments == len(comments):
break
return comments
|