File size: 13,859 Bytes
ab04f33
795c651
 
 
 
 
 
 
 
 
08cd396
795c651
 
 
 
ba9e3a7
330a4e5
795c651
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
748a090
 
795c651
 
 
748a090
795c651
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import os
import re
import gradio as gr
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
import time
import traceback
import tempfile
from datetime import datetime, timedelta
from huggingface_hub import login

# --- Constants ---
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
API_KEY = os.environ.get("YT_API_KEY")  # Replace with your actual YouTube Data API key
DEFAULT_KEYWORDS = "3d"
DEFAULT_DAYS = 180  # Default to 6 months
DEFAULT_MAX_VIDEOS = 100  # Default to 100 videos

# --- YouTube API Helper Functions ---

def get_youtube_service():
    """Initializes and returns the YouTube API service."""
    try:
        return build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=API_KEY, cache_discovery=False)
    except HttpError as e:
        raise ConnectionError(f"Could not connect to YouTube API: {e}")

def get_channel_id(service, handle):
    """Gets the channel ID from a handle or ID."""
    if not handle:
        raise ValueError("Channel handle or ID is required.")
    if handle.startswith("UC") and len(handle) == 24:
        return handle
    handle = handle if handle.startswith('@') else f"@{handle}"
    try:
        search_response = service.search().list(q=handle, part="id", type="channel", maxResults=1).execute()
        if not search_response.get("items"):
            raise ValueError(f"Channel '{handle}' not found.")
        return search_response["items"][0]["id"]["channelId"]
    except HttpError as e:
        raise ConnectionError(f"API error finding channel ID: {e.content}")

def get_uploads_playlist_id(service, channel_id):
    """Gets the uploads playlist ID."""
    try:
        response = service.channels().list(id=channel_id, part="contentDetails").execute()
        if not response.get("items"):
            raise ValueError(f"No channel details for ID '{channel_id}'.")
        return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
    except HttpError as e:
        raise ConnectionError(f"API error getting uploads playlist: {e.content}")

def get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos):
    """Fetches video IDs with pre-filtering by keywords, date, and max limit."""
    video_ids = []
    next_page_token = None
    cutoff_date = (datetime.now() - timedelta(days=days_filter)).isoformat("T") + "Z"
    
    while True:
        try:
            response = service.playlistItems().list(
                playlistId=playlist_id,
                part="snippet,contentDetails",
                maxResults=50,
                pageToken=next_page_token
            ).execute()
            for item in response.get("items", []):
                video_id = item["contentDetails"]["videoId"]
                snippet = item["snippet"]
                title = snippet["title"].lower()
                description = snippet.get("description", "").lower()
                published_at = snippet["publishedAt"]
                
                if published_at < cutoff_date:
                    continue
                
                if any(keyword in title or keyword in description for keyword in keywords_set):
                    video_ids.append(video_id)
                
                if len(video_ids) >= max_videos:
                    return video_ids[:max_videos]
            
            next_page_token = response.get("nextPageToken")
            if not next_page_token:
                break
        except HttpError as e:
            print(f"API Error fetching video IDs: {e.content}")
            break
    return video_ids[:max_videos]

def process_video(service, video_id, keywords_set):
    """Processes a video for keyword mentions and links."""
    video_url = f"https://www.youtube.com/watch?v={video_id}"
    result = {
        "video_id": video_id,
        "video_url": video_url,
        "title": f"Video ID: {video_id}",
        "transcript_mentions": set(),
        "description_mentions": set(),
        "description_links": []
    }

    try:
        video_response = service.videos().list(id=video_id, part="snippet").execute()
        if video_response.get("items"):
            snippet = video_response["items"][0]["snippet"]
            result["title"] = snippet.get("title", f"Video ID: {video_id}")
            description = snippet.get("description", "").lower()
            for keyword in keywords_set:
                if keyword in description:
                    result["description_mentions"].add(keyword)
            result["description_links"] = re.findall(r'https?://\S+', snippet.get("description", ""))
    except HttpError as e:
        print(f"API error getting details for {video_id}: {e.resp.status}")

    if not result["description_mentions"]:
        try:
            transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
            transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB'])
            if transcript:
                full_transcript = transcript.fetch()
                transcript_text = " ".join(segment['text'] for segment in full_transcript).lower()
                for keyword in keywords_set:
                    if keyword in transcript_text:
                        result["transcript_mentions"].add(keyword)
        except (TranscriptsDisabled, NoTranscriptFound, Exception) as e:
            print(f"Error fetching transcript for {video_id}: {type(e).__name__}")

    if result["transcript_mentions"] or result["description_mentions"]:
        return result
    return None

# --- Main Function ---

def scan_channel_videos(channel_handle, keywords_str, days_filter, max_videos, progress=gr.Progress(track_tqdm=True)):
    """Scans a YouTube channel for keyword mentions and links with user-defined filters."""
    start_time = time.time()
    status_log = []
    results = []

    def log_status(message):
        print(message)
        status_log.append(message)
        yield "\n".join(status_log), gr.Markdown("### Processing..."), None

    try:
        yield from log_status("1. Initializing YouTube Service...")
        service = get_youtube_service()

        yield from log_status(f"2. Finding Channel ID for '{channel_handle}'...")
        channel_id = get_channel_id(service, channel_handle)
        yield from log_status(f"   Found Channel ID: {channel_id}")

        yield from log_status(f"3. Fetching Uploads Playlist ID...")
        playlist_id = get_uploads_playlist_id(service, channel_id)
        yield from log_status(f"   Found Playlist ID: {playlist_id}")

        keywords_list = [k.strip().lower() for k in keywords_str.split(',') if k.strip()]
        if not keywords_list:
            raise ValueError("At least one keyword is required.")
        keywords_set = set(keywords_list)
        
        days_filter = int(days_filter) if days_filter else DEFAULT_DAYS
        max_videos = int(max_videos) if max_videos else DEFAULT_MAX_VIDEOS
        if days_filter < 1:
            raise ValueError("Days filter must be at least 1.")
        if max_videos < 1:
            raise ValueError("Max videos must be at least 1.")

        yield from log_status(f"4. Fetching Video IDs with filters (last {days_filter} days, max {max_videos} videos)...")
        video_ids = get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos)
        if not video_ids:
            yield from log_status("   No videos found matching filters.")
            yield "\n".join(status_log), gr.Markdown("### Error\nNo videos found matching filters."), None
            return
        yield from log_status(f"   Found {len(video_ids)} videos after filtering.")

        yield from log_status(f"5. Scanning {len(video_ids)} videos for keywords: {', '.join(keywords_list)}...")
        for video_id in progress.tqdm(video_ids, desc="Scanning Videos"):
            result = process_video(service, video_id, keywords_set)
            if result:
                results.append(result)
                yield from log_status(f"   Found mentions in: {result['title']} - {result['video_url']} ({video_id})")

        yield from log_status("\n6. Formatting Results...")
        final_md = f"""
## Scan Results for {channel_handle}

**Searched Keywords**: {', '.join(keywords_list)}  
**Videos Found**: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)  
**Scan Duration**: {time.time() - start_time:.2f} seconds  
**Filters Applied**: Last {days_filter} days, max {max_videos} videos

---

"""
        final_text = f"Scan Results for {channel_handle}\n\n"
        final_text += f"Searched Keywords: {', '.join(keywords_list)}\n"
        final_text += f"Videos Found: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)\n"
        final_text += f"Scan Duration: {time.time() - start_time:.2f} seconds\n"
        final_text += f"Filters Applied: Last {days_filter} days, max {max_videos} videos\n\n"

        if not results:
            final_md += "\n**No mentions found for the specified keywords.**"
            final_text += "No mentions found for the specified keywords.\n"
        else:
            for res in sorted(results, key=lambda x: x['title']):
                final_md += f"""
### {res['title']}

- **Video URL**: [{res['video_url']}]({res['video_url']})
"""
                final_text += f"Video: {res['title']}\n"
                final_text += f"Video URL: {res['video_url']}\n"

                if res['transcript_mentions']:
                    mentions = ', '.join(sorted(res['transcript_mentions']))
                    final_md += f"- **Transcript Mentions**: {mentions}\n"
                    final_text += f"Transcript Mentions: {mentions}\n"
                if res['description_mentions']:
                    mentions = ', '.join(sorted(res['description_mentions']))
                    final_md += f"- **Description Mentions**: {mentions}\n"
                    final_text += f"Description Mentions: {mentions}\n"
                if res['description_links']:
                    final_md += f"- **Links in Description**:\n"
                    final_text += f"Links in Description:\n"
                    for link in res['description_links']:
                        final_md += f"  - [{link}]({link})\n"
                        final_text += f"  - {link}\n"
                final_md += "\n---\n"
                final_text += "\n---\n"

        # Create temporary file with a specific name
        with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='_youtube_scan_results.txt') as temp_file:
            temp_file.write(final_text)
            temp_file_path = temp_file.name

        yield "\n".join(status_log), gr.Markdown(final_md), gr.File(value=temp_file_path, label="Download Results")

    except ValueError as ve:
        yield from log_status(f"Error: {ve}")
        yield "\n".join(status_log), gr.Markdown(f"### Error\n**Input Error:** {ve}"), None
    except ConnectionError as ce:
        yield from log_status(f"Error: {ce}")
        yield "\n".join(status_log), gr.Markdown(f"### Error\n**API Connection Error:** {ce}"), None
    except Exception as e:
        traceback.print_exc()
        yield from log_status(f"Error: {e}")
        yield "\n".join(status_log), gr.Markdown(f"### Error\n**Unexpected Error:** {e}"), None

# --- Gradio Interface ---

with gr.Blocks(theme=gr.themes.Soft()) as app:
    gr.Markdown("# YouTube Keyword Scanner")
    gr.Markdown("Search for keywords in YouTube video transcripts and descriptions, with customizable filters and downloadable results.")

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("## Settings")
            channel_input = gr.Textbox(
                label="Channel Handle or ID",
                placeholder="e.g., @theAIsearch or UCxxxxxxxxxxxxxx",
                value="@theAIsearch"
            )
            keywords_input = gr.Textbox(
                label="Keywords (comma-separated)",
                placeholder="e.g., 3d, blender, maya",
                value=DEFAULT_KEYWORDS
            )
            days_filter_input = gr.Number(
                label="Days to Look Back",
                value=DEFAULT_DAYS,
                minimum=1,
                precision=0,
                info="Filter videos from the last X days"
            )
            max_videos_input = gr.Number(
                label="Max Videos to Scan",
                value=DEFAULT_MAX_VIDEOS,
                minimum=1,
                precision=0,
                info="Limit the number of videos scanned"
            )
            submit_button = gr.Button("Submit", variant="primary")
            clear_button = gr.Button("Clear")

        with gr.Column(scale=2):
            gr.Markdown("## Status & Logs")
            status_output = gr.Textbox(
                label="Scan Progress",
                lines=10,
                max_lines=20,
                interactive=False,
                autoscroll=True
            )
            gr.Markdown("## Results")
            results_output = gr.Markdown(value="Results will appear here.")
            download_output = gr.File(label="Download Results", visible=False)

    submit_button.click(
        fn=scan_channel_videos,
        inputs=[channel_input, keywords_input, days_filter_input, max_videos_input],
        outputs=[status_output, results_output, download_output]
    )
    clear_button.click(
        fn=lambda: ("", "Results cleared.", "", DEFAULT_KEYWORDS, DEFAULT_DAYS, DEFAULT_MAX_VIDEOS, None),
        inputs=[],
        outputs=[status_output, results_output, channel_input, keywords_input, days_filter_input, max_videos_input, download_output]
    )


# --- Run the App ---
if __name__ == "__main__":
    app.launch(debug=False)