File size: 4,190 Bytes
dbaa71b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import logging
from datetime import datetime

from pydantic import PrivateAttr
from typing import Optional, List, Any, Dict

from obsei.misc.utils import DEFAULT_LOOKUP_PERIOD, convert_utc_time, DATETIME_STRING_PATTERN
from obsei.misc.youtube_reviews_scrapper import YouTubeCommentExtractor
from obsei.payload import TextPayload
from obsei.source.base_source import BaseSource, BaseSourceConfig

logger = logging.getLogger(__name__)


class YoutubeScrapperConfig(BaseSourceConfig):
    _YT_VIDEO_URL: str = PrivateAttr('https://www.youtube.com/watch?v={video_id}')
    TYPE: str = "YoutubeScrapper"
    video_id: Optional[str] = None
    video_url: Optional[str] = None
    user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36'
    sort_by: int = 1  # 0 = sort by popular, 1 = sort by recent
    max_comments: Optional[int] = 20
    fetch_replies: bool = False
    lang_code: Optional[str] = None
    sleep_time: float = 0.1
    request_retries: int = 5
    lookup_period: Optional[str] = None

    def __init__(self, **data: Any):
        super().__init__(**data)

        if not self.video_id and not self.video_url:
            raise ValueError("Either `video_id` or `video_url` is required")

        if not self.video_url:
            self.video_url = self._YT_VIDEO_URL.format(video_id=self.video_id)


class YoutubeScrapperSource(BaseSource):
    NAME: Optional[str] = "YoutubeScrapper"

    def lookup(self, config: YoutubeScrapperConfig, **kwargs: Any) -> List[TextPayload]:  # type: ignore[override]
        source_responses: List[TextPayload] = []

        # Get data from state
        identifier: str = kwargs.get("id", None)
        state: Optional[Dict[str, Any]] = (
            None
            if id is None or self.store is None
            else self.store.get_source_state(identifier)
        )
        update_state: bool = True if identifier else False
        state = state or dict()

        lookup_period: str = state.get("since_time", config.lookup_period)
        lookup_period = lookup_period or DEFAULT_LOOKUP_PERIOD
        if len(lookup_period) <= 5:
            since_time = convert_utc_time(lookup_period)
        else:
            since_time = datetime.strptime(lookup_period, DATETIME_STRING_PATTERN)

        last_since_time: datetime = since_time
        since_id: Optional[str] = state.get("since_id", None)
        last_index = since_id

        comments: Optional[List[Dict[str, Any]]] = None
        try:
            if not config.video_url:
                raise RuntimeError("`video_url` in config should not be empty or None")

            scrapper: YouTubeCommentExtractor = YouTubeCommentExtractor(
                video_url=config.video_url,
                user_agent=config.user_agent,
                sort_by=config.sort_by,
                max_comments=config.max_comments,
                fetch_replies=config.fetch_replies,
                lang_code=config.lang_code,
                sleep_time=config.sleep_time,
                request_retries=config.request_retries,
            )

            comments = scrapper.fetch_comments(until_datetime=since_time)
        except RuntimeError as ex:
            logger.warning(ex.__cause__)

        comments = comments or []

        for comment in comments:
            source_responses.append(
                TextPayload(
                    processed_text=comment["text"],
                    meta=comment,
                    source_name=self.NAME,
                )
            )

            comment_time = comment["time"]

            if comment_time is not None and (last_since_time is None or last_since_time < comment_time):
                last_since_time = comment_time
            if last_index is None:
                # Assuming list is sorted based on time
                last_index = comment["comment_id"]

        state["since_time"] = last_since_time.strftime(DATETIME_STRING_PATTERN)
        state["since_id"] = last_index

        if update_state and self.store is not None:
            self.store.update_source_state(workflow_id=identifier, state=state)

        return source_responses