Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -4,57 +4,42 @@ import gradio as gr
|
|
4 |
from googleapiclient.discovery import build
|
5 |
from googleapiclient.errors import HttpError
|
6 |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
|
7 |
-
import time
|
|
|
8 |
|
9 |
# --- Constants ---
|
10 |
YOUTUBE_API_SERVICE_NAME = "youtube"
|
11 |
YOUTUBE_API_VERSION = "v3"
|
12 |
-
DEFAULT_KEYWORDS =
|
13 |
-
|
14 |
-
)
|
15 |
|
16 |
-
# --- YouTube API Helper Functions
|
17 |
|
18 |
-
|
19 |
-
|
20 |
-
|
21 |
-
# if the same key is used repeatedly, but building it each time is safer.
|
22 |
-
# We will build it each time within the main Gradio function for simplicity & correctness.
|
23 |
-
|
24 |
-
def get_youtube_service(api_key):
|
25 |
-
"""Initializes and returns the YouTube API service."""
|
26 |
-
if not api_key:
|
27 |
raise ValueError("API Key is missing.")
|
28 |
try:
|
29 |
-
# Disable cache discovery for reliability in different environments
|
30 |
-
# Might slightly slow down the first request but avoids potential issues.
|
31 |
service = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
|
32 |
-
|
33 |
return service
|
34 |
except HttpError as e:
|
35 |
if e.resp.status == 400:
|
36 |
-
|
37 |
elif e.resp.status == 403:
|
38 |
-
|
39 |
else:
|
40 |
-
|
41 |
except Exception as e:
|
42 |
raise ConnectionError(f"Error building YouTube service: {e}")
|
43 |
|
44 |
-
|
45 |
def get_channel_id(service, channel_identifier):
|
46 |
"""Gets the channel ID using the channel handle or ID."""
|
47 |
if not channel_identifier:
|
48 |
raise ValueError("Channel Handle or ID is missing.")
|
49 |
-
|
50 |
-
# If it looks like a channel ID already
|
51 |
if channel_identifier.startswith("UC") and len(channel_identifier) == 24:
|
52 |
-
# Optional: Verify it's a valid channel ID (API call)
|
53 |
-
# For speed, we'll trust the user if it looks like an ID
|
54 |
print(f"Assuming '{channel_identifier}' is a Channel ID.")
|
55 |
return channel_identifier
|
56 |
-
|
57 |
-
# If it looks like a handle
|
58 |
if channel_identifier.startswith('@'):
|
59 |
handle = channel_identifier
|
60 |
print(f"Attempting to find Channel ID for handle: {handle}")
|
@@ -65,9 +50,8 @@ def get_channel_id(service, channel_identifier):
|
|
65 |
type="channel",
|
66 |
maxResults=1
|
67 |
).execute()
|
68 |
-
|
69 |
if not search_response.get("items"):
|
70 |
-
|
71 |
channel_id = search_response["items"][0]["id"]["channelId"]
|
72 |
print(f"Found Channel ID: {channel_id}")
|
73 |
return channel_id
|
@@ -78,7 +62,6 @@ def get_channel_id(service, channel_identifier):
|
|
78 |
else:
|
79 |
raise ValueError("Invalid Channel Identifier. Use '@handle' or 'UC...' Channel ID.")
|
80 |
|
81 |
-
|
82 |
def get_channel_uploads_playlist_id(service, channel_id):
|
83 |
"""Gets the uploads playlist ID for a given channel ID."""
|
84 |
try:
|
@@ -86,26 +69,22 @@ def get_channel_uploads_playlist_id(service, channel_id):
|
|
86 |
id=channel_id,
|
87 |
part="contentDetails"
|
88 |
).execute()
|
89 |
-
|
90 |
if not channels_response.get("items"):
|
91 |
raise ValueError(f"Could not find channel details for ID '{channel_id}'.")
|
92 |
-
|
93 |
-
playlist_id = channels_response["items"][0].get("contentDetails", {}).get("relatedPlaylists", {}).get("uploads")
|
94 |
if not playlist_id:
|
95 |
-
raise ValueError(f"Could not find
|
96 |
return playlist_id
|
97 |
except HttpError as e:
|
98 |
raise ConnectionError(f"API error getting uploads playlist for {channel_id}: {e.content}")
|
99 |
except Exception as e:
|
100 |
raise Exception(f"Error getting uploads playlist for {channel_id}: {e}")
|
101 |
|
102 |
-
|
103 |
def get_all_video_ids(service, playlist_id):
|
104 |
-
"""Gets all video IDs from a
|
105 |
video_ids = []
|
106 |
next_page_token = None
|
107 |
fetched_count = 0
|
108 |
-
|
109 |
yield f"Fetching video IDs from playlist: {playlist_id}..."
|
110 |
while True:
|
111 |
try:
|
@@ -115,147 +94,99 @@ def get_all_video_ids(service, playlist_id):
|
|
115 |
maxResults=50,
|
116 |
pageToken=next_page_token
|
117 |
).execute()
|
118 |
-
|
119 |
current_page_ids = [item["contentDetails"]["videoId"]
|
120 |
-
|
121 |
-
if item.get("contentDetails", {}).get("videoId")] # Ensure videoId exists
|
122 |
video_ids.extend(current_page_ids)
|
123 |
fetched_count += len(current_page_ids)
|
124 |
-
|
125 |
next_page_token = playlist_items_response.get("nextPageToken")
|
126 |
-
|
127 |
yield f"Fetched {fetched_count} video IDs so far..."
|
128 |
-
|
129 |
if not next_page_token:
|
130 |
break
|
131 |
-
# time.sleep(0.05) # Small delay between page fetches
|
132 |
-
|
133 |
except HttpError as e:
|
134 |
-
# Check for quota error specifically
|
135 |
if e.resp.status == 403:
|
136 |
-
yield f"API Quota Error
|
137 |
-
break
|
138 |
else:
|
139 |
-
yield f"API Error fetching video list
|
140 |
-
break
|
141 |
except Exception as e:
|
142 |
-
yield f"Error fetching video list
|
143 |
break
|
144 |
-
|
145 |
yield f"Finished fetching. Total unique video IDs found: {len(video_ids)}"
|
146 |
return video_ids
|
147 |
|
148 |
-
|
149 |
def process_video(service, video_id, keywords_set):
|
150 |
-
"""Fetches video details
|
151 |
video_url = f"https://www.youtube.com/watch?v={video_id}"
|
152 |
found_data = {
|
153 |
"video_id": video_id,
|
154 |
"video_url": video_url,
|
155 |
-
"title": f"Video ID: {video_id}",
|
156 |
"transcript_mentions": set(),
|
157 |
"description_mentions": set(),
|
158 |
"description_links": []
|
159 |
}
|
160 |
has_mention = False
|
161 |
-
status_updates = []
|
162 |
|
163 |
-
# 1. Get Video Details (Title and Description)
|
164 |
try:
|
165 |
video_response = service.videos().list(
|
166 |
id=video_id,
|
167 |
part="snippet"
|
168 |
).execute()
|
169 |
-
|
170 |
if video_response.get("items"):
|
171 |
-
snippet = video_response["items"][0]
|
172 |
found_data["title"] = snippet.get("title", f"Video ID: {video_id}")
|
173 |
description = snippet.get("description", "").lower()
|
174 |
-
|
175 |
-
# Search description for keywords
|
176 |
for keyword in keywords_set:
|
177 |
-
|
178 |
-
# pattern = r'\b' + re.escape(keyword) + r'\b'
|
179 |
-
# if re.search(pattern, description):
|
180 |
-
if keyword in description: # Simpler check
|
181 |
found_data["description_mentions"].add(keyword)
|
182 |
has_mention = True
|
183 |
-
|
184 |
-
# Extract links from description
|
185 |
found_data["description_links"] = re.findall(r'https?://\S+', snippet.get("description", ""))
|
186 |
else:
|
187 |
-
|
188 |
-
|
189 |
except HttpError as e:
|
190 |
status_updates.append(f" - API error getting details for {video_id}: {e.resp.status}")
|
191 |
except Exception as e:
|
192 |
status_updates.append(f" - Error getting details for {video_id}: {e}")
|
193 |
|
194 |
-
# 2. Get Transcript
|
195 |
transcript_text = ""
|
196 |
try:
|
197 |
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
|
198 |
-
|
199 |
-
transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB']) # Prefers manual, falls back to generated
|
200 |
-
except NoTranscriptFound:
|
201 |
-
# Try common non-English potentially auto-translatable? Might be too broad.
|
202 |
-
# Let's stick to English for now. User can add keywords in other languages if needed.
|
203 |
-
status_updates.append(f" - No English transcript found for {video_id}.")
|
204 |
-
transcript = None
|
205 |
-
|
206 |
if transcript:
|
207 |
full_transcript = transcript.fetch()
|
208 |
-
# Limit transcript length for processing? Maybe not needed for keyword search.
|
209 |
transcript_text = " ".join([segment['text'] for segment in full_transcript]).lower()
|
210 |
-
|
211 |
-
# Search transcript for keywords
|
212 |
for keyword in keywords_set:
|
213 |
-
|
214 |
-
# if re.search(pattern, transcript_text):
|
215 |
-
if keyword in transcript_text: # Simpler check
|
216 |
found_data["transcript_mentions"].add(keyword)
|
217 |
has_mention = True
|
218 |
-
|
219 |
-
|
220 |
-
status_updates.append(f" - Transcripts disabled for {video_id}")
|
221 |
-
except NoTranscriptFound:
|
222 |
-
# Handled above but catch just in case logic changes
|
223 |
-
status_updates.append(f" - No transcript found (checked again) for {video_id}")
|
224 |
except Exception as e:
|
225 |
-
# Catch potential network errors or library issues
|
226 |
status_updates.append(f" - Error fetching transcript for {video_id}: {type(e).__name__}")
|
227 |
|
228 |
-
if has_mention
|
229 |
-
# Convert sets back to lists for consistency if needed later, but sets are fine for display logic
|
230 |
-
return found_data, status_updates
|
231 |
-
else:
|
232 |
-
return None, status_updates # Return None for data if no mentions
|
233 |
-
|
234 |
|
235 |
# --- Gradio Main Function ---
|
236 |
|
237 |
-
def scan_channel_videos(
|
238 |
-
"""Main function called by Gradio interface."""
|
239 |
start_time = time.time()
|
240 |
status_log = []
|
241 |
results = []
|
242 |
|
243 |
def log_status(message):
|
244 |
-
print(message)
|
245 |
status_log.append(message)
|
246 |
-
|
247 |
-
yield "\n".join(status_log), gr.Markdown("### Processing...") # Keep results empty/processing
|
248 |
|
249 |
try:
|
250 |
yield from log_status("1. Initializing YouTube Service...")
|
251 |
-
|
252 |
-
raise ValueError("API Key is required.")
|
253 |
-
service = get_youtube_service(api_key)
|
254 |
yield from log_status(" Service Initialized.")
|
255 |
|
256 |
yield from log_status(f"2. Finding Channel ID for '{channel_identifier}'...")
|
257 |
-
if not channel_identifier:
|
258 |
-
raise ValueError("Channel Handle or ID is required.")
|
259 |
channel_id = get_channel_id(service, channel_identifier)
|
260 |
yield from log_status(f" Found Channel ID: {channel_id}")
|
261 |
|
@@ -264,62 +195,33 @@ def scan_channel_videos(api_key, channel_identifier, keywords_str, progress=gr.P
|
|
264 |
yield from log_status(f" Found Uploads Playlist ID: {uploads_playlist_id}")
|
265 |
|
266 |
yield from log_status("4. Fetching all Video IDs...")
|
267 |
-
# Use a generator to yield status updates from within get_all_video_ids
|
268 |
video_ids_generator = get_all_video_ids(service, uploads_playlist_id)
|
269 |
video_ids = []
|
270 |
-
|
271 |
-
|
272 |
-
|
273 |
-
|
274 |
-
|
275 |
-
|
276 |
-
video_ids = status_or_list
|
277 |
-
# The last status is already yielded by the function
|
278 |
-
# yield from log_status(f" Finished fetching. Total unique video IDs found: {len(video_ids)}")
|
279 |
-
break
|
280 |
-
except StopIteration:
|
281 |
-
# Generator finished, means it yielded the list last
|
282 |
-
# This case might happen if the generator structure changes, handle defensively
|
283 |
-
if not video_ids: # If we never got the list (e.g., error occurred)
|
284 |
-
yield from log_status(" Warning: Video ID fetching may have stopped unexpectedly.")
|
285 |
-
break # Exit loop
|
286 |
-
except Exception as e:
|
287 |
-
yield from log_status(f" Error during video ID fetching: {e}")
|
288 |
-
break # Stop if error occurs
|
289 |
-
|
290 |
|
291 |
if not video_ids:
|
292 |
-
yield from log_status(" No video IDs found or fetching failed.
|
293 |
-
|
294 |
-
yield "\n".join(status_log), gr.Markdown("### Scan Stopped\nNo videos found or API error during fetch.")
|
295 |
return
|
296 |
|
297 |
-
# Prepare keywords
|
298 |
keywords_list = [k.strip().lower() for k in keywords_str.split(',') if k.strip()]
|
299 |
if not keywords_list:
|
300 |
raise ValueError("Please provide at least one keyword.")
|
301 |
-
keywords_set = set(keywords_list)
|
302 |
yield from log_status(f"5. Scanning {len(video_ids)} videos for keywords: {', '.join(keywords_list)}")
|
303 |
|
304 |
total_videos = len(video_ids)
|
305 |
-
processed_count = 0
|
306 |
-
# Process videos with progress tracking
|
307 |
for video_id in progress.tqdm(video_ids, desc="Scanning Videos"):
|
308 |
-
processed_count += 1
|
309 |
video_result, video_statuses = process_video(service, video_id, keywords_set)
|
310 |
-
# Log minor statuses only if needed for debugging, keep main log cleaner
|
311 |
-
# for status in video_statuses:
|
312 |
-
# yield from log_status(status) # This can make the log very long
|
313 |
-
|
314 |
if video_result:
|
315 |
results.append(video_result)
|
316 |
yield from log_status(f" Found mention in: {video_result['title']} ({video_id})")
|
317 |
|
318 |
-
# Add a small delay to be nice to APIs, especially transcript API
|
319 |
-
# time.sleep(0.1) # Adjust as needed
|
320 |
-
|
321 |
-
|
322 |
-
# 6. Format Results
|
323 |
yield from log_status("\n6. Scan Complete. Formatting results...")
|
324 |
final_md = f"## Scan Results for {channel_identifier}\n\n"
|
325 |
final_md += f"Searched for keywords: `{', '.join(keywords_list)}`\n"
|
@@ -331,75 +233,56 @@ def scan_channel_videos(api_key, channel_identifier, keywords_str, progress=gr.P
|
|
331 |
final_md += "\n**No mentions found for the specified keywords.**"
|
332 |
else:
|
333 |
for res in results:
|
334 |
-
final_md += f"\n### [{res['title']}]({res['video_url']})\n"
|
335 |
-
final_md += f"*Video URL: <{res['video_url']}>*\n\n"
|
336 |
if res['transcript_mentions']:
|
337 |
-
|
338 |
-
|
339 |
if res['description_mentions']:
|
340 |
-
|
341 |
-
|
342 |
if res['description_links']:
|
343 |
-
|
344 |
-
|
345 |
-
|
346 |
-
|
347 |
-
parts = k.split()
|
348 |
-
possible_keywords.add(parts[0]) # e.g., add 'blender' from 'blender foundation'
|
349 |
-
|
350 |
-
for link in res['description_links']:
|
351 |
-
is_potentially_related = any(pk in link.lower() for pk in possible_keywords if pk)
|
352 |
-
marker = " (*)" if is_potentially_related else ""
|
353 |
-
final_md += f"- <{link}>{marker}\n" # Use angle brackets for auto-linking in Markdown
|
354 |
-
final_md += "\n---\n" # Separator between videos
|
355 |
|
356 |
-
yield "\n".join(status_log), gr.Markdown(final_md)
|
357 |
|
358 |
except ValueError as ve:
|
359 |
yield from log_status(f"Configuration Error: {ve}")
|
360 |
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Input Error:** {ve}")
|
361 |
except ConnectionError as ce:
|
362 |
-
|
363 |
-
|
364 |
except HttpError as he:
|
365 |
yield from log_status(f"API HTTP Error: {he.resp.status} - {he.content}")
|
366 |
-
yield "\n".join(status_log), gr.Markdown(f"### Error\n**API HTTP Error:** Status {he.resp.status}\n{he.content}
|
367 |
except Exception as e:
|
368 |
-
|
369 |
-
traceback.print_exc() # Print full traceback to console for debugging
|
370 |
yield from log_status(f"An unexpected error occurred: {e}")
|
371 |
-
yield "\n".join(status_log), gr.Markdown(f"### Error\n**
|
372 |
-
|
373 |
|
374 |
# --- Gradio Interface Definition ---
|
375 |
|
376 |
with gr.Blocks(theme=gr.themes.Soft()) as app:
|
377 |
gr.Markdown("# YouTube Channel 3D Software Scanner")
|
378 |
-
gr.Markdown("Find mentions of 3D software in video transcripts and descriptions
|
379 |
|
380 |
with gr.Row():
|
381 |
with gr.Column(scale=1):
|
382 |
gr.Markdown("## Settings")
|
383 |
-
api_key_input = gr.Textbox(
|
384 |
-
label="YouTube Data API Key",
|
385 |
-
placeholder="Enter your Google Cloud API Key here",
|
386 |
-
type="password",
|
387 |
-
info="Keep this key secure. Get one from Google Cloud Console."
|
388 |
-
)
|
389 |
channel_input = gr.Textbox(
|
390 |
label="Channel Handle or ID",
|
391 |
-
placeholder="e.g., @theAIsearch or UCxxxxxxxxxxxxxx"
|
392 |
-
info="Use the channel's @handle or its full Channel ID."
|
393 |
)
|
394 |
keywords_input = gr.Textbox(
|
395 |
label="Keywords to Search (comma-separated)",
|
396 |
-
value=DEFAULT_KEYWORDS
|
397 |
-
info="List software or terms like 'Blender', '3D Modeling', 'Unreal Engine'."
|
398 |
)
|
399 |
scan_button = gr.Button("Scan Channel", variant="primary")
|
400 |
clear_button = gr.Button("Clear All")
|
401 |
|
402 |
-
|
403 |
with gr.Column(scale=2):
|
404 |
gr.Markdown("## Status & Logs")
|
405 |
status_output = gr.Textbox(
|
@@ -407,38 +290,29 @@ with gr.Blocks(theme=gr.themes.Soft()) as app:
|
|
407 |
lines=10,
|
408 |
max_lines=20,
|
409 |
interactive=False,
|
410 |
-
autoscroll=True
|
411 |
-
placeholder="Scan status updates will appear here..."
|
412 |
)
|
413 |
gr.Markdown("## Results")
|
414 |
-
results_output = gr.Markdown(
|
415 |
-
value="Results will be displayed here after scanning.",
|
416 |
-
# Allow HTML for links? Markdown should handle <url>
|
417 |
-
)
|
418 |
|
419 |
-
# Button Click Actions
|
420 |
scan_button.click(
|
421 |
fn=scan_channel_videos,
|
422 |
-
inputs=[
|
423 |
-
outputs=[status_output, results_output]
|
424 |
-
# api_name="scan_channel" # Optional: for API usage if needed
|
425 |
)
|
426 |
|
427 |
def clear_outputs():
|
428 |
-
return "", "Results cleared.", "",
|
429 |
|
430 |
clear_button.click(
|
431 |
fn=clear_outputs,
|
432 |
inputs=[],
|
433 |
-
outputs=[status_output, results_output,
|
434 |
)
|
435 |
|
436 |
gr.Markdown("---")
|
437 |
-
gr.Markdown("**
|
438 |
-
|
439 |
|
440 |
# --- Run the App ---
|
441 |
if __name__ == "__main__":
|
442 |
-
# share=True creates a public link (use with caution, especially with API keys)
|
443 |
-
# Set debug=True for more detailed logs in console during development
|
444 |
app.launch(debug=False)
|
|
|
4 |
from googleapiclient.discovery import build
|
5 |
from googleapiclient.errors import HttpError
|
6 |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
|
7 |
+
import time
|
8 |
+
import traceback
|
9 |
|
10 |
# --- Constants ---
|
11 |
YOUTUBE_API_SERVICE_NAME = "youtube"
|
12 |
YOUTUBE_API_VERSION = "v3"
|
13 |
+
DEFAULT_KEYWORDS = "3d, texturing, rigging, vfx, cgi"
|
14 |
+
API_KEY = "AIzaSyCcxSkhTp6aowcyowuBkHIFTSrl_HJ79J0" # Replace with your actual YouTube Data API key
|
|
|
15 |
|
16 |
+
# --- YouTube API Helper Functions ---
|
17 |
|
18 |
+
def get_youtube_service():
|
19 |
+
"""Initializes and returns the YouTube API service with hardcoded API key."""
|
20 |
+
if not API_KEY:
|
|
|
|
|
|
|
|
|
|
|
|
|
21 |
raise ValueError("API Key is missing.")
|
22 |
try:
|
|
|
|
|
23 |
service = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
|
24 |
+
developerKey=API_KEY, cache_discovery=False)
|
25 |
return service
|
26 |
except HttpError as e:
|
27 |
if e.resp.status == 400:
|
28 |
+
raise ValueError(f"Invalid API Key or API not enabled. Error: {e.content}")
|
29 |
elif e.resp.status == 403:
|
30 |
+
raise ValueError(f"API Key valid, but Quota Exceeded or Forbidden. Error: {e.content}")
|
31 |
else:
|
32 |
+
raise ConnectionError(f"Could not connect to YouTube API: {e}")
|
33 |
except Exception as e:
|
34 |
raise ConnectionError(f"Error building YouTube service: {e}")
|
35 |
|
|
|
36 |
def get_channel_id(service, channel_identifier):
|
37 |
"""Gets the channel ID using the channel handle or ID."""
|
38 |
if not channel_identifier:
|
39 |
raise ValueError("Channel Handle or ID is missing.")
|
|
|
|
|
40 |
if channel_identifier.startswith("UC") and len(channel_identifier) == 24:
|
|
|
|
|
41 |
print(f"Assuming '{channel_identifier}' is a Channel ID.")
|
42 |
return channel_identifier
|
|
|
|
|
43 |
if channel_identifier.startswith('@'):
|
44 |
handle = channel_identifier
|
45 |
print(f"Attempting to find Channel ID for handle: {handle}")
|
|
|
50 |
type="channel",
|
51 |
maxResults=1
|
52 |
).execute()
|
|
|
53 |
if not search_response.get("items"):
|
54 |
+
raise ValueError(f"Could not find channel for handle '{handle}'. Check the handle.")
|
55 |
channel_id = search_response["items"][0]["id"]["channelId"]
|
56 |
print(f"Found Channel ID: {channel_id}")
|
57 |
return channel_id
|
|
|
62 |
else:
|
63 |
raise ValueError("Invalid Channel Identifier. Use '@handle' or 'UC...' Channel ID.")
|
64 |
|
|
|
65 |
def get_channel_uploads_playlist_id(service, channel_id):
|
66 |
"""Gets the uploads playlist ID for a given channel ID."""
|
67 |
try:
|
|
|
69 |
id=channel_id,
|
70 |
part="contentDetails"
|
71 |
).execute()
|
|
|
72 |
if not channels_response.get("items"):
|
73 |
raise ValueError(f"Could not find channel details for ID '{channel_id}'.")
|
74 |
+
playlist_id = channels_response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
|
|
|
75 |
if not playlist_id:
|
76 |
+
raise ValueError(f"Could not find uploads playlist for channel {channel_id}.")
|
77 |
return playlist_id
|
78 |
except HttpError as e:
|
79 |
raise ConnectionError(f"API error getting uploads playlist for {channel_id}: {e.content}")
|
80 |
except Exception as e:
|
81 |
raise Exception(f"Error getting uploads playlist for {channel_id}: {e}")
|
82 |
|
|
|
83 |
def get_all_video_ids(service, playlist_id):
|
84 |
+
"""Gets all video IDs from a playlist, yielding status updates."""
|
85 |
video_ids = []
|
86 |
next_page_token = None
|
87 |
fetched_count = 0
|
|
|
88 |
yield f"Fetching video IDs from playlist: {playlist_id}..."
|
89 |
while True:
|
90 |
try:
|
|
|
94 |
maxResults=50,
|
95 |
pageToken=next_page_token
|
96 |
).execute()
|
|
|
97 |
current_page_ids = [item["contentDetails"]["videoId"]
|
98 |
+
for item in playlist_items_response.get("items", [])]
|
|
|
99 |
video_ids.extend(current_page_ids)
|
100 |
fetched_count += len(current_page_ids)
|
|
|
101 |
next_page_token = playlist_items_response.get("nextPageToken")
|
|
|
102 |
yield f"Fetched {fetched_count} video IDs so far..."
|
|
|
103 |
if not next_page_token:
|
104 |
break
|
|
|
|
|
105 |
except HttpError as e:
|
|
|
106 |
if e.resp.status == 403:
|
107 |
+
yield f"API Quota Error: {e.content}. Stopping early."
|
108 |
+
break
|
109 |
else:
|
110 |
+
yield f"API Error fetching video list: {e.content}. Stopping early."
|
111 |
+
break
|
112 |
except Exception as e:
|
113 |
+
yield f"Error fetching video list: {e}. Stopping early."
|
114 |
break
|
|
|
115 |
yield f"Finished fetching. Total unique video IDs found: {len(video_ids)}"
|
116 |
return video_ids
|
117 |
|
|
|
118 |
def process_video(service, video_id, keywords_set):
|
119 |
+
"""Fetches video details and transcript, searches for keywords."""
|
120 |
video_url = f"https://www.youtube.com/watch?v={video_id}"
|
121 |
found_data = {
|
122 |
"video_id": video_id,
|
123 |
"video_url": video_url,
|
124 |
+
"title": f"Video ID: {video_id}",
|
125 |
"transcript_mentions": set(),
|
126 |
"description_mentions": set(),
|
127 |
"description_links": []
|
128 |
}
|
129 |
has_mention = False
|
130 |
+
status_updates = []
|
131 |
|
|
|
132 |
try:
|
133 |
video_response = service.videos().list(
|
134 |
id=video_id,
|
135 |
part="snippet"
|
136 |
).execute()
|
|
|
137 |
if video_response.get("items"):
|
138 |
+
snippet = video_response["items"][0]["snippet"]
|
139 |
found_data["title"] = snippet.get("title", f"Video ID: {video_id}")
|
140 |
description = snippet.get("description", "").lower()
|
|
|
|
|
141 |
for keyword in keywords_set:
|
142 |
+
if keyword in description:
|
|
|
|
|
|
|
143 |
found_data["description_mentions"].add(keyword)
|
144 |
has_mention = True
|
|
|
|
|
145 |
found_data["description_links"] = re.findall(r'https?://\S+', snippet.get("description", ""))
|
146 |
else:
|
147 |
+
status_updates.append(f" - Could not retrieve details for video {video_id}")
|
|
|
148 |
except HttpError as e:
|
149 |
status_updates.append(f" - API error getting details for {video_id}: {e.resp.status}")
|
150 |
except Exception as e:
|
151 |
status_updates.append(f" - Error getting details for {video_id}: {e}")
|
152 |
|
|
|
153 |
transcript_text = ""
|
154 |
try:
|
155 |
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
|
156 |
+
transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
157 |
if transcript:
|
158 |
full_transcript = transcript.fetch()
|
|
|
159 |
transcript_text = " ".join([segment['text'] for segment in full_transcript]).lower()
|
|
|
|
|
160 |
for keyword in keywords_set:
|
161 |
+
if keyword in transcript_text:
|
|
|
|
|
162 |
found_data["transcript_mentions"].add(keyword)
|
163 |
has_mention = True
|
164 |
+
except (TranscriptsDisabled, NoTranscriptFound):
|
165 |
+
status_updates.append(f" - No transcript available for {video_id}")
|
|
|
|
|
|
|
|
|
166 |
except Exception as e:
|
|
|
167 |
status_updates.append(f" - Error fetching transcript for {video_id}: {type(e).__name__}")
|
168 |
|
169 |
+
return found_data if has_mention else None, status_updates
|
|
|
|
|
|
|
|
|
|
|
170 |
|
171 |
# --- Gradio Main Function ---
|
172 |
|
173 |
+
def scan_channel_videos(channel_identifier, keywords_str, progress=gr.Progress(track_tqdm=True)):
|
174 |
+
"""Main function called by Gradio interface with hardcoded API key."""
|
175 |
start_time = time.time()
|
176 |
status_log = []
|
177 |
results = []
|
178 |
|
179 |
def log_status(message):
|
180 |
+
print(message)
|
181 |
status_log.append(message)
|
182 |
+
yield "\n".join(status_log), gr.Markdown("### Processing...")
|
|
|
183 |
|
184 |
try:
|
185 |
yield from log_status("1. Initializing YouTube Service...")
|
186 |
+
service = get_youtube_service()
|
|
|
|
|
187 |
yield from log_status(" Service Initialized.")
|
188 |
|
189 |
yield from log_status(f"2. Finding Channel ID for '{channel_identifier}'...")
|
|
|
|
|
190 |
channel_id = get_channel_id(service, channel_identifier)
|
191 |
yield from log_status(f" Found Channel ID: {channel_id}")
|
192 |
|
|
|
195 |
yield from log_status(f" Found Uploads Playlist ID: {uploads_playlist_id}")
|
196 |
|
197 |
yield from log_status("4. Fetching all Video IDs...")
|
|
|
198 |
video_ids_generator = get_all_video_ids(service, uploads_playlist_id)
|
199 |
video_ids = []
|
200 |
+
for status_or_list in video_ids_generator:
|
201 |
+
if isinstance(status_or_list, str):
|
202 |
+
yield from log_status(f" {status_or_list}")
|
203 |
+
else:
|
204 |
+
video_ids = status_or_list
|
205 |
+
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
206 |
|
207 |
if not video_ids:
|
208 |
+
yield from log_status(" No video IDs found or fetching failed.")
|
209 |
+
yield "\n".join(status_log), gr.Markdown("### Scan Stopped\nNo videos found.")
|
|
|
210 |
return
|
211 |
|
|
|
212 |
keywords_list = [k.strip().lower() for k in keywords_str.split(',') if k.strip()]
|
213 |
if not keywords_list:
|
214 |
raise ValueError("Please provide at least one keyword.")
|
215 |
+
keywords_set = set(keywords_list)
|
216 |
yield from log_status(f"5. Scanning {len(video_ids)} videos for keywords: {', '.join(keywords_list)}")
|
217 |
|
218 |
total_videos = len(video_ids)
|
|
|
|
|
219 |
for video_id in progress.tqdm(video_ids, desc="Scanning Videos"):
|
|
|
220 |
video_result, video_statuses = process_video(service, video_id, keywords_set)
|
|
|
|
|
|
|
|
|
221 |
if video_result:
|
222 |
results.append(video_result)
|
223 |
yield from log_status(f" Found mention in: {video_result['title']} ({video_id})")
|
224 |
|
|
|
|
|
|
|
|
|
|
|
225 |
yield from log_status("\n6. Scan Complete. Formatting results...")
|
226 |
final_md = f"## Scan Results for {channel_identifier}\n\n"
|
227 |
final_md += f"Searched for keywords: `{', '.join(keywords_list)}`\n"
|
|
|
233 |
final_md += "\n**No mentions found for the specified keywords.**"
|
234 |
else:
|
235 |
for res in results:
|
236 |
+
final_md += f"\n### [{res['title']}]({res['video_url']})\n"
|
237 |
+
final_md += f"*Video URL: <{res['video_url']}>*\n\n"
|
238 |
if res['transcript_mentions']:
|
239 |
+
mentions = ", ".join(sorted(res['transcript_mentions']))
|
240 |
+
final_md += f"**Transcript Mentions:** `{mentions}`\n"
|
241 |
if res['description_mentions']:
|
242 |
+
mentions = ", ".join(sorted(res['description_mentions']))
|
243 |
+
final_md += f"**Description Mentions:** `{mentions}`\n"
|
244 |
if res['description_links']:
|
245 |
+
final_md += f"\n**Links in Description:**\n"
|
246 |
+
for link in res['description_links']:
|
247 |
+
final_md += f"- <{link}>\n"
|
248 |
+
final_md += "\n---\n"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
249 |
|
250 |
+
yield "\n".join(status_log), gr.Markdown(final_md)
|
251 |
|
252 |
except ValueError as ve:
|
253 |
yield from log_status(f"Configuration Error: {ve}")
|
254 |
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Input Error:** {ve}")
|
255 |
except ConnectionError as ce:
|
256 |
+
yield from log_status(f"API Connection Error: {ce}")
|
257 |
+
yield "\n".join(status_log), gr.Markdown(f"### Error\n**API Connection Error:** {ce}")
|
258 |
except HttpError as he:
|
259 |
yield from log_status(f"API HTTP Error: {he.resp.status} - {he.content}")
|
260 |
+
yield "\n".join(status_log), gr.Markdown(f"### Error\n**API HTTP Error:** Status {he.resp.status}\n{he.content}")
|
261 |
except Exception as e:
|
262 |
+
traceback.print_exc()
|
|
|
263 |
yield from log_status(f"An unexpected error occurred: {e}")
|
264 |
+
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Unexpected Error:** {e}")
|
|
|
265 |
|
266 |
# --- Gradio Interface Definition ---
|
267 |
|
268 |
with gr.Blocks(theme=gr.themes.Soft()) as app:
|
269 |
gr.Markdown("# YouTube Channel 3D Software Scanner")
|
270 |
+
gr.Markdown("Find mentions of 3D software in video transcripts and descriptions.")
|
271 |
|
272 |
with gr.Row():
|
273 |
with gr.Column(scale=1):
|
274 |
gr.Markdown("## Settings")
|
|
|
|
|
|
|
|
|
|
|
|
|
275 |
channel_input = gr.Textbox(
|
276 |
label="Channel Handle or ID",
|
277 |
+
placeholder="e.g., @theAIsearch or UCxxxxxxxxxxxxxx"
|
|
|
278 |
)
|
279 |
keywords_input = gr.Textbox(
|
280 |
label="Keywords to Search (comma-separated)",
|
281 |
+
value=DEFAULT_KEYWORDS
|
|
|
282 |
)
|
283 |
scan_button = gr.Button("Scan Channel", variant="primary")
|
284 |
clear_button = gr.Button("Clear All")
|
285 |
|
|
|
286 |
with gr.Column(scale=2):
|
287 |
gr.Markdown("## Status & Logs")
|
288 |
status_output = gr.Textbox(
|
|
|
290 |
lines=10,
|
291 |
max_lines=20,
|
292 |
interactive=False,
|
293 |
+
autoscroll=True
|
|
|
294 |
)
|
295 |
gr.Markdown("## Results")
|
296 |
+
results_output = gr.Markdown(value="Results will be displayed here after scanning.")
|
|
|
|
|
|
|
297 |
|
|
|
298 |
scan_button.click(
|
299 |
fn=scan_channel_videos,
|
300 |
+
inputs=[channel_input, keywords_input],
|
301 |
+
outputs=[status_output, results_output]
|
|
|
302 |
)
|
303 |
|
304 |
def clear_outputs():
|
305 |
+
return "", "Results cleared.", "", DEFAULT_KEYWORDS
|
306 |
|
307 |
clear_button.click(
|
308 |
fn=clear_outputs,
|
309 |
inputs=[],
|
310 |
+
outputs=[status_output, results_output, channel_input, keywords_input]
|
311 |
)
|
312 |
|
313 |
gr.Markdown("---")
|
314 |
+
gr.Markdown("**Note:** Scans may take time depending on video count and API quotas.")
|
|
|
315 |
|
316 |
# --- Run the App ---
|
317 |
if __name__ == "__main__":
|
|
|
|
|
318 |
app.launch(debug=False)
|