Spaces:
Sleeping
Sleeping
import os | |
import re | |
import gradio as gr | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
import time | |
import traceback | |
import tempfile | |
from datetime import datetime, timedelta | |
from huggingface_hub import login | |
# --- Constants --- | |
YOUTUBE_API_SERVICE_NAME = "youtube" | |
YOUTUBE_API_VERSION = "v3" | |
API_KEY = os.environ.get("YT_API_KEY") # Replace with your actual YouTube Data API key | |
DEFAULT_KEYWORDS = "3d" | |
DEFAULT_DAYS = 180 # Default to 6 months | |
DEFAULT_MAX_VIDEOS = 100 # Default to 100 videos | |
# --- YouTube API Helper Functions --- | |
def get_youtube_service(): | |
"""Initializes and returns the YouTube API service.""" | |
try: | |
return build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=API_KEY, cache_discovery=False) | |
except HttpError as e: | |
raise ConnectionError(f"Could not connect to YouTube API: {e}") | |
def get_channel_id(service, handle): | |
"""Gets the channel ID from a handle or ID.""" | |
if not handle: | |
raise ValueError("Channel handle or ID is required.") | |
if handle.startswith("UC") and len(handle) == 24: | |
return handle | |
handle = handle if handle.startswith('@') else f"@{handle}" | |
try: | |
search_response = service.search().list(q=handle, part="id", type="channel", maxResults=1).execute() | |
if not search_response.get("items"): | |
raise ValueError(f"Channel '{handle}' not found.") | |
return search_response["items"][0]["id"]["channelId"] | |
except HttpError as e: | |
raise ConnectionError(f"API error finding channel ID: {e.content}") | |
def get_uploads_playlist_id(service, channel_id): | |
"""Gets the uploads playlist ID.""" | |
try: | |
response = service.channels().list(id=channel_id, part="contentDetails").execute() | |
if not response.get("items"): | |
raise ValueError(f"No channel details for ID '{channel_id}'.") | |
return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] | |
except HttpError as e: | |
raise ConnectionError(f"API error getting uploads playlist: {e.content}") | |
def get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos): | |
"""Fetches video IDs with pre-filtering by keywords, date, and max limit.""" | |
video_ids = [] | |
next_page_token = None | |
cutoff_date = (datetime.now() - timedelta(days=days_filter)).isoformat("T") + "Z" | |
while True: | |
try: | |
response = service.playlistItems().list( | |
playlistId=playlist_id, | |
part="snippet,contentDetails", | |
maxResults=50, | |
pageToken=next_page_token | |
).execute() | |
for item in response.get("items", []): | |
video_id = item["contentDetails"]["videoId"] | |
snippet = item["snippet"] | |
title = snippet["title"].lower() | |
description = snippet.get("description", "").lower() | |
published_at = snippet["publishedAt"] | |
if published_at < cutoff_date: | |
continue | |
if any(keyword in title or keyword in description for keyword in keywords_set): | |
video_ids.append(video_id) | |
if len(video_ids) >= max_videos: | |
return video_ids[:max_videos] | |
next_page_token = response.get("nextPageToken") | |
if not next_page_token: | |
break | |
except HttpError as e: | |
print(f"API Error fetching video IDs: {e.content}") | |
break | |
return video_ids[:max_videos] | |
def process_video(service, video_id, keywords_set): | |
"""Processes a video for keyword mentions and links.""" | |
video_url = f"https://www.youtube.com/watch?v={video_id}" | |
result = { | |
"video_id": video_id, | |
"video_url": video_url, | |
"title": f"Video ID: {video_id}", | |
"transcript_mentions": set(), | |
"description_mentions": set(), | |
"description_links": [] | |
} | |
try: | |
video_response = service.videos().list(id=video_id, part="snippet").execute() | |
if video_response.get("items"): | |
snippet = video_response["items"][0]["snippet"] | |
result["title"] = snippet.get("title", f"Video ID: {video_id}") | |
description = snippet.get("description", "").lower() | |
for keyword in keywords_set: | |
if keyword in description: | |
result["description_mentions"].add(keyword) | |
result["description_links"] = re.findall(r'https?://\S+', snippet.get("description", "")) | |
except HttpError as e: | |
print(f"API error getting details for {video_id}: {e.resp.status}") | |
if not result["description_mentions"]: | |
try: | |
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB']) | |
if transcript: | |
full_transcript = transcript.fetch() | |
transcript_text = " ".join(segment['text'] for segment in full_transcript).lower() | |
for keyword in keywords_set: | |
if keyword in transcript_text: | |
result["transcript_mentions"].add(keyword) | |
except (TranscriptsDisabled, NoTranscriptFound, Exception) as e: | |
print(f"Error fetching transcript for {video_id}: {type(e).__name__}") | |
if result["transcript_mentions"] or result["description_mentions"]: | |
return result | |
return None | |
# --- Main Function --- | |
def scan_channel_videos(channel_handle, keywords_str, days_filter, max_videos, progress=gr.Progress(track_tqdm=True)): | |
"""Scans a YouTube channel for keyword mentions and links with user-defined filters.""" | |
start_time = time.time() | |
status_log = [] | |
results = [] | |
def log_status(message): | |
print(message) | |
status_log.append(message) | |
yield "\n".join(status_log), gr.Markdown("### Processing..."), None | |
try: | |
yield from log_status("1. Initializing YouTube Service...") | |
service = get_youtube_service() | |
yield from log_status(f"2. Finding Channel ID for '{channel_handle}'...") | |
channel_id = get_channel_id(service, channel_handle) | |
yield from log_status(f" Found Channel ID: {channel_id}") | |
yield from log_status(f"3. Fetching Uploads Playlist ID...") | |
playlist_id = get_uploads_playlist_id(service, channel_id) | |
yield from log_status(f" Found Playlist ID: {playlist_id}") | |
keywords_list = [k.strip().lower() for k in keywords_str.split(',') if k.strip()] | |
if not keywords_list: | |
raise ValueError("At least one keyword is required.") | |
keywords_set = set(keywords_list) | |
days_filter = int(days_filter) if days_filter else DEFAULT_DAYS | |
max_videos = int(max_videos) if max_videos else DEFAULT_MAX_VIDEOS | |
if days_filter < 1: | |
raise ValueError("Days filter must be at least 1.") | |
if max_videos < 1: | |
raise ValueError("Max videos must be at least 1.") | |
yield from log_status(f"4. Fetching Video IDs with filters (last {days_filter} days, max {max_videos} videos)...") | |
video_ids = get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos) | |
if not video_ids: | |
yield from log_status(" No videos found matching filters.") | |
yield "\n".join(status_log), gr.Markdown("### Error\nNo videos found matching filters."), None | |
return | |
yield from log_status(f" Found {len(video_ids)} videos after filtering.") | |
yield from log_status(f"5. Scanning {len(video_ids)} videos for keywords: {', '.join(keywords_list)}...") | |
for video_id in progress.tqdm(video_ids, desc="Scanning Videos"): | |
result = process_video(service, video_id, keywords_set) | |
if result: | |
results.append(result) | |
yield from log_status(f" Found mentions in: {result['title']} - {result['video_url']} ({video_id})") | |
yield from log_status("\n6. Formatting Results...") | |
final_md = f""" | |
## Scan Results for {channel_handle} | |
**Searched Keywords**: {', '.join(keywords_list)} | |
**Videos Found**: {len(results)} out of {len(video_ids)} scanned (filtered from channel total) | |
**Scan Duration**: {time.time() - start_time:.2f} seconds | |
**Filters Applied**: Last {days_filter} days, max {max_videos} videos | |
--- | |
""" | |
final_text = f"Scan Results for {channel_handle}\n\n" | |
final_text += f"Searched Keywords: {', '.join(keywords_list)}\n" | |
final_text += f"Videos Found: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)\n" | |
final_text += f"Scan Duration: {time.time() - start_time:.2f} seconds\n" | |
final_text += f"Filters Applied: Last {days_filter} days, max {max_videos} videos\n\n" | |
if not results: | |
final_md += "\n**No mentions found for the specified keywords.**" | |
final_text += "No mentions found for the specified keywords.\n" | |
else: | |
for res in sorted(results, key=lambda x: x['title']): | |
final_md += f""" | |
### {res['title']} | |
- **Video URL**: [{res['video_url']}]({res['video_url']}) | |
""" | |
final_text += f"Video: {res['title']}\n" | |
final_text += f"Video URL: {res['video_url']}\n" | |
if res['transcript_mentions']: | |
mentions = ', '.join(sorted(res['transcript_mentions'])) | |
final_md += f"- **Transcript Mentions**: {mentions}\n" | |
final_text += f"Transcript Mentions: {mentions}\n" | |
if res['description_mentions']: | |
mentions = ', '.join(sorted(res['description_mentions'])) | |
final_md += f"- **Description Mentions**: {mentions}\n" | |
final_text += f"Description Mentions: {mentions}\n" | |
if res['description_links']: | |
final_md += f"- **Links in Description**:\n" | |
final_text += f"Links in Description:\n" | |
for link in res['description_links']: | |
final_md += f" - [{link}]({link})\n" | |
final_text += f" - {link}\n" | |
final_md += "\n---\n" | |
final_text += "\n---\n" | |
# Create temporary file with a specific name | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='_youtube_scan_results.txt') as temp_file: | |
temp_file.write(final_text) | |
temp_file_path = temp_file.name | |
yield "\n".join(status_log), gr.Markdown(final_md), gr.File(value=temp_file_path, label="Download Results") | |
except ValueError as ve: | |
yield from log_status(f"Error: {ve}") | |
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Input Error:** {ve}"), None | |
except ConnectionError as ce: | |
yield from log_status(f"Error: {ce}") | |
yield "\n".join(status_log), gr.Markdown(f"### Error\n**API Connection Error:** {ce}"), None | |
except Exception as e: | |
traceback.print_exc() | |
yield from log_status(f"Error: {e}") | |
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Unexpected Error:** {e}"), None | |
# --- Gradio Interface --- | |
with gr.Blocks(theme=gr.themes.Soft()) as app: | |
gr.Markdown("# YouTube Keyword Scanner") | |
gr.Markdown("Search for keywords in YouTube video transcripts and descriptions, with customizable filters and downloadable results.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("## Settings") | |
channel_input = gr.Textbox( | |
label="Channel Handle or ID", | |
placeholder="e.g., @theAIsearch or UCxxxxxxxxxxxxxx", | |
value="@theAIsearch" | |
) | |
keywords_input = gr.Textbox( | |
label="Keywords (comma-separated)", | |
placeholder="e.g., 3d, blender, maya", | |
value=DEFAULT_KEYWORDS | |
) | |
days_filter_input = gr.Number( | |
label="Days to Look Back", | |
value=DEFAULT_DAYS, | |
minimum=1, | |
precision=0, | |
info="Filter videos from the last X days" | |
) | |
max_videos_input = gr.Number( | |
label="Max Videos to Scan", | |
value=DEFAULT_MAX_VIDEOS, | |
minimum=1, | |
precision=0, | |
info="Limit the number of videos scanned" | |
) | |
submit_button = gr.Button("Submit", variant="primary") | |
clear_button = gr.Button("Clear") | |
with gr.Column(scale=2): | |
gr.Markdown("## Status & Logs") | |
status_output = gr.Textbox( | |
label="Scan Progress", | |
lines=10, | |
max_lines=20, | |
interactive=False, | |
autoscroll=True | |
) | |
gr.Markdown("## Results") | |
results_output = gr.Markdown(value="Results will appear here.") | |
download_output = gr.File(label="Download Results", visible=False) | |
submit_button.click( | |
fn=scan_channel_videos, | |
inputs=[channel_input, keywords_input, days_filter_input, max_videos_input], | |
outputs=[status_output, results_output, download_output] | |
) | |
clear_button.click( | |
fn=lambda: ("", "Results cleared.", "", DEFAULT_KEYWORDS, DEFAULT_DAYS, DEFAULT_MAX_VIDEOS, None), | |
inputs=[], | |
outputs=[status_output, results_output, channel_input, keywords_input, days_filter_input, max_videos_input, download_output] | |
) | |
# --- Run the App --- | |
if __name__ == "__main__": | |
app.launch(debug=False) |