Spaces:
Sleeping
Sleeping
File size: 13,859 Bytes
ab04f33 795c651 08cd396 795c651 ba9e3a7 330a4e5 795c651 748a090 795c651 748a090 795c651 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
import os
import re
import gradio as gr
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
import time
import traceback
import tempfile
from datetime import datetime, timedelta
from huggingface_hub import login
# --- Constants ---
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
API_KEY = os.environ.get("YT_API_KEY") # Replace with your actual YouTube Data API key
DEFAULT_KEYWORDS = "3d"
DEFAULT_DAYS = 180 # Default to 6 months
DEFAULT_MAX_VIDEOS = 100 # Default to 100 videos
# --- YouTube API Helper Functions ---
def get_youtube_service():
"""Initializes and returns the YouTube API service."""
try:
return build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=API_KEY, cache_discovery=False)
except HttpError as e:
raise ConnectionError(f"Could not connect to YouTube API: {e}")
def get_channel_id(service, handle):
"""Gets the channel ID from a handle or ID."""
if not handle:
raise ValueError("Channel handle or ID is required.")
if handle.startswith("UC") and len(handle) == 24:
return handle
handle = handle if handle.startswith('@') else f"@{handle}"
try:
search_response = service.search().list(q=handle, part="id", type="channel", maxResults=1).execute()
if not search_response.get("items"):
raise ValueError(f"Channel '{handle}' not found.")
return search_response["items"][0]["id"]["channelId"]
except HttpError as e:
raise ConnectionError(f"API error finding channel ID: {e.content}")
def get_uploads_playlist_id(service, channel_id):
"""Gets the uploads playlist ID."""
try:
response = service.channels().list(id=channel_id, part="contentDetails").execute()
if not response.get("items"):
raise ValueError(f"No channel details for ID '{channel_id}'.")
return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
except HttpError as e:
raise ConnectionError(f"API error getting uploads playlist: {e.content}")
def get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos):
"""Fetches video IDs with pre-filtering by keywords, date, and max limit."""
video_ids = []
next_page_token = None
cutoff_date = (datetime.now() - timedelta(days=days_filter)).isoformat("T") + "Z"
while True:
try:
response = service.playlistItems().list(
playlistId=playlist_id,
part="snippet,contentDetails",
maxResults=50,
pageToken=next_page_token
).execute()
for item in response.get("items", []):
video_id = item["contentDetails"]["videoId"]
snippet = item["snippet"]
title = snippet["title"].lower()
description = snippet.get("description", "").lower()
published_at = snippet["publishedAt"]
if published_at < cutoff_date:
continue
if any(keyword in title or keyword in description for keyword in keywords_set):
video_ids.append(video_id)
if len(video_ids) >= max_videos:
return video_ids[:max_videos]
next_page_token = response.get("nextPageToken")
if not next_page_token:
break
except HttpError as e:
print(f"API Error fetching video IDs: {e.content}")
break
return video_ids[:max_videos]
def process_video(service, video_id, keywords_set):
"""Processes a video for keyword mentions and links."""
video_url = f"https://www.youtube.com/watch?v={video_id}"
result = {
"video_id": video_id,
"video_url": video_url,
"title": f"Video ID: {video_id}",
"transcript_mentions": set(),
"description_mentions": set(),
"description_links": []
}
try:
video_response = service.videos().list(id=video_id, part="snippet").execute()
if video_response.get("items"):
snippet = video_response["items"][0]["snippet"]
result["title"] = snippet.get("title", f"Video ID: {video_id}")
description = snippet.get("description", "").lower()
for keyword in keywords_set:
if keyword in description:
result["description_mentions"].add(keyword)
result["description_links"] = re.findall(r'https?://\S+', snippet.get("description", ""))
except HttpError as e:
print(f"API error getting details for {video_id}: {e.resp.status}")
if not result["description_mentions"]:
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB'])
if transcript:
full_transcript = transcript.fetch()
transcript_text = " ".join(segment['text'] for segment in full_transcript).lower()
for keyword in keywords_set:
if keyword in transcript_text:
result["transcript_mentions"].add(keyword)
except (TranscriptsDisabled, NoTranscriptFound, Exception) as e:
print(f"Error fetching transcript for {video_id}: {type(e).__name__}")
if result["transcript_mentions"] or result["description_mentions"]:
return result
return None
# --- Main Function ---
def scan_channel_videos(channel_handle, keywords_str, days_filter, max_videos, progress=gr.Progress(track_tqdm=True)):
"""Scans a YouTube channel for keyword mentions and links with user-defined filters."""
start_time = time.time()
status_log = []
results = []
def log_status(message):
print(message)
status_log.append(message)
yield "\n".join(status_log), gr.Markdown("### Processing..."), None
try:
yield from log_status("1. Initializing YouTube Service...")
service = get_youtube_service()
yield from log_status(f"2. Finding Channel ID for '{channel_handle}'...")
channel_id = get_channel_id(service, channel_handle)
yield from log_status(f" Found Channel ID: {channel_id}")
yield from log_status(f"3. Fetching Uploads Playlist ID...")
playlist_id = get_uploads_playlist_id(service, channel_id)
yield from log_status(f" Found Playlist ID: {playlist_id}")
keywords_list = [k.strip().lower() for k in keywords_str.split(',') if k.strip()]
if not keywords_list:
raise ValueError("At least one keyword is required.")
keywords_set = set(keywords_list)
days_filter = int(days_filter) if days_filter else DEFAULT_DAYS
max_videos = int(max_videos) if max_videos else DEFAULT_MAX_VIDEOS
if days_filter < 1:
raise ValueError("Days filter must be at least 1.")
if max_videos < 1:
raise ValueError("Max videos must be at least 1.")
yield from log_status(f"4. Fetching Video IDs with filters (last {days_filter} days, max {max_videos} videos)...")
video_ids = get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos)
if not video_ids:
yield from log_status(" No videos found matching filters.")
yield "\n".join(status_log), gr.Markdown("### Error\nNo videos found matching filters."), None
return
yield from log_status(f" Found {len(video_ids)} videos after filtering.")
yield from log_status(f"5. Scanning {len(video_ids)} videos for keywords: {', '.join(keywords_list)}...")
for video_id in progress.tqdm(video_ids, desc="Scanning Videos"):
result = process_video(service, video_id, keywords_set)
if result:
results.append(result)
yield from log_status(f" Found mentions in: {result['title']} - {result['video_url']} ({video_id})")
yield from log_status("\n6. Formatting Results...")
final_md = f"""
## Scan Results for {channel_handle}
**Searched Keywords**: {', '.join(keywords_list)}
**Videos Found**: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)
**Scan Duration**: {time.time() - start_time:.2f} seconds
**Filters Applied**: Last {days_filter} days, max {max_videos} videos
---
"""
final_text = f"Scan Results for {channel_handle}\n\n"
final_text += f"Searched Keywords: {', '.join(keywords_list)}\n"
final_text += f"Videos Found: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)\n"
final_text += f"Scan Duration: {time.time() - start_time:.2f} seconds\n"
final_text += f"Filters Applied: Last {days_filter} days, max {max_videos} videos\n\n"
if not results:
final_md += "\n**No mentions found for the specified keywords.**"
final_text += "No mentions found for the specified keywords.\n"
else:
for res in sorted(results, key=lambda x: x['title']):
final_md += f"""
### {res['title']}
- **Video URL**: [{res['video_url']}]({res['video_url']})
"""
final_text += f"Video: {res['title']}\n"
final_text += f"Video URL: {res['video_url']}\n"
if res['transcript_mentions']:
mentions = ', '.join(sorted(res['transcript_mentions']))
final_md += f"- **Transcript Mentions**: {mentions}\n"
final_text += f"Transcript Mentions: {mentions}\n"
if res['description_mentions']:
mentions = ', '.join(sorted(res['description_mentions']))
final_md += f"- **Description Mentions**: {mentions}\n"
final_text += f"Description Mentions: {mentions}\n"
if res['description_links']:
final_md += f"- **Links in Description**:\n"
final_text += f"Links in Description:\n"
for link in res['description_links']:
final_md += f" - [{link}]({link})\n"
final_text += f" - {link}\n"
final_md += "\n---\n"
final_text += "\n---\n"
# Create temporary file with a specific name
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='_youtube_scan_results.txt') as temp_file:
temp_file.write(final_text)
temp_file_path = temp_file.name
yield "\n".join(status_log), gr.Markdown(final_md), gr.File(value=temp_file_path, label="Download Results")
except ValueError as ve:
yield from log_status(f"Error: {ve}")
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Input Error:** {ve}"), None
except ConnectionError as ce:
yield from log_status(f"Error: {ce}")
yield "\n".join(status_log), gr.Markdown(f"### Error\n**API Connection Error:** {ce}"), None
except Exception as e:
traceback.print_exc()
yield from log_status(f"Error: {e}")
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Unexpected Error:** {e}"), None
# --- Gradio Interface ---
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("# YouTube Keyword Scanner")
gr.Markdown("Search for keywords in YouTube video transcripts and descriptions, with customizable filters and downloadable results.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## Settings")
channel_input = gr.Textbox(
label="Channel Handle or ID",
placeholder="e.g., @theAIsearch or UCxxxxxxxxxxxxxx",
value="@theAIsearch"
)
keywords_input = gr.Textbox(
label="Keywords (comma-separated)",
placeholder="e.g., 3d, blender, maya",
value=DEFAULT_KEYWORDS
)
days_filter_input = gr.Number(
label="Days to Look Back",
value=DEFAULT_DAYS,
minimum=1,
precision=0,
info="Filter videos from the last X days"
)
max_videos_input = gr.Number(
label="Max Videos to Scan",
value=DEFAULT_MAX_VIDEOS,
minimum=1,
precision=0,
info="Limit the number of videos scanned"
)
submit_button = gr.Button("Submit", variant="primary")
clear_button = gr.Button("Clear")
with gr.Column(scale=2):
gr.Markdown("## Status & Logs")
status_output = gr.Textbox(
label="Scan Progress",
lines=10,
max_lines=20,
interactive=False,
autoscroll=True
)
gr.Markdown("## Results")
results_output = gr.Markdown(value="Results will appear here.")
download_output = gr.File(label="Download Results", visible=False)
submit_button.click(
fn=scan_channel_videos,
inputs=[channel_input, keywords_input, days_filter_input, max_videos_input],
outputs=[status_output, results_output, download_output]
)
clear_button.click(
fn=lambda: ("", "Results cleared.", "", DEFAULT_KEYWORDS, DEFAULT_DAYS, DEFAULT_MAX_VIDEOS, None),
inputs=[],
outputs=[status_output, results_output, channel_input, keywords_input, days_filter_input, max_videos_input, download_output]
)
# --- Run the App ---
if __name__ == "__main__":
app.launch(debug=False) |