import os import re import gradio as gr from googleapiclient.discovery import build from googleapiclient.errors import HttpError from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound import time import traceback import tempfile from datetime import datetime, timedelta from huggingface_hub import login # --- Constants --- YOUTUBE_API_SERVICE_NAME = "youtube" YOUTUBE_API_VERSION = "v3" API_KEY = os.environ.get("YT_API_KEY") # Replace with your actual YouTube Data API key DEFAULT_KEYWORDS = "3d" DEFAULT_DAYS = 180 # Default to 6 months DEFAULT_MAX_VIDEOS = 100 # Default to 100 videos # --- YouTube API Helper Functions --- def get_youtube_service(): """Initializes and returns the YouTube API service.""" try: return build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=API_KEY, cache_discovery=False) except HttpError as e: raise ConnectionError(f"Could not connect to YouTube API: {e}") def get_channel_id(service, handle): """Gets the channel ID from a handle or ID.""" if not handle: raise ValueError("Channel handle or ID is required.") if handle.startswith("UC") and len(handle) == 24: return handle handle = handle if handle.startswith('@') else f"@{handle}" try: search_response = service.search().list(q=handle, part="id", type="channel", maxResults=1).execute() if not search_response.get("items"): raise ValueError(f"Channel '{handle}' not found.") return search_response["items"][0]["id"]["channelId"] except HttpError as e: raise ConnectionError(f"API error finding channel ID: {e.content}") def get_uploads_playlist_id(service, channel_id): """Gets the uploads playlist ID.""" try: response = service.channels().list(id=channel_id, part="contentDetails").execute() if not response.get("items"): raise ValueError(f"No channel details for ID '{channel_id}'.") return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] except HttpError as e: raise ConnectionError(f"API error getting uploads playlist: {e.content}") def get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos): """Fetches video IDs with pre-filtering by keywords, date, and max limit.""" video_ids = [] next_page_token = None cutoff_date = (datetime.now() - timedelta(days=days_filter)).isoformat("T") + "Z" while True: try: response = service.playlistItems().list( playlistId=playlist_id, part="snippet,contentDetails", maxResults=50, pageToken=next_page_token ).execute() for item in response.get("items", []): video_id = item["contentDetails"]["videoId"] snippet = item["snippet"] title = snippet["title"].lower() description = snippet.get("description", "").lower() published_at = snippet["publishedAt"] if published_at < cutoff_date: continue if any(keyword in title or keyword in description for keyword in keywords_set): video_ids.append(video_id) if len(video_ids) >= max_videos: return video_ids[:max_videos] next_page_token = response.get("nextPageToken") if not next_page_token: break except HttpError as e: print(f"API Error fetching video IDs: {e.content}") break return video_ids[:max_videos] def process_video(service, video_id, keywords_set): """Processes a video for keyword mentions and links.""" video_url = f"https://www.youtube.com/watch?v={video_id}" result = { "video_id": video_id, "video_url": video_url, "title": f"Video ID: {video_id}", "transcript_mentions": set(), "description_mentions": set(), "description_links": [] } try: video_response = service.videos().list(id=video_id, part="snippet").execute() if video_response.get("items"): snippet = video_response["items"][0]["snippet"] result["title"] = snippet.get("title", f"Video ID: {video_id}") description = snippet.get("description", "").lower() for keyword in keywords_set: if keyword in description: result["description_mentions"].add(keyword) result["description_links"] = re.findall(r'https?://\S+', snippet.get("description", "")) except HttpError as e: print(f"API error getting details for {video_id}: {e.resp.status}") if not result["description_mentions"]: try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB']) if transcript: full_transcript = transcript.fetch() transcript_text = " ".join(segment['text'] for segment in full_transcript).lower() for keyword in keywords_set: if keyword in transcript_text: result["transcript_mentions"].add(keyword) except (TranscriptsDisabled, NoTranscriptFound, Exception) as e: print(f"Error fetching transcript for {video_id}: {type(e).__name__}") if result["transcript_mentions"] or result["description_mentions"]: return result return None # --- Main Function --- def scan_channel_videos(channel_handle, keywords_str, days_filter, max_videos, progress=gr.Progress(track_tqdm=True)): """Scans a YouTube channel for keyword mentions and links with user-defined filters.""" start_time = time.time() status_log = [] results = [] def log_status(message): print(message) status_log.append(message) yield "\n".join(status_log), gr.Markdown("### Processing..."), None try: yield from log_status("1. Initializing YouTube Service...") service = get_youtube_service() yield from log_status(f"2. Finding Channel ID for '{channel_handle}'...") channel_id = get_channel_id(service, channel_handle) yield from log_status(f" Found Channel ID: {channel_id}") yield from log_status(f"3. Fetching Uploads Playlist ID...") playlist_id = get_uploads_playlist_id(service, channel_id) yield from log_status(f" Found Playlist ID: {playlist_id}") keywords_list = [k.strip().lower() for k in keywords_str.split(',') if k.strip()] if not keywords_list: raise ValueError("At least one keyword is required.") keywords_set = set(keywords_list) days_filter = int(days_filter) if days_filter else DEFAULT_DAYS max_videos = int(max_videos) if max_videos else DEFAULT_MAX_VIDEOS if days_filter < 1: raise ValueError("Days filter must be at least 1.") if max_videos < 1: raise ValueError("Max videos must be at least 1.") yield from log_status(f"4. Fetching Video IDs with filters (last {days_filter} days, max {max_videos} videos)...") video_ids = get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos) if not video_ids: yield from log_status(" No videos found matching filters.") yield "\n".join(status_log), gr.Markdown("### Error\nNo videos found matching filters."), None return yield from log_status(f" Found {len(video_ids)} videos after filtering.") yield from log_status(f"5. Scanning {len(video_ids)} videos for keywords: {', '.join(keywords_list)}...") for video_id in progress.tqdm(video_ids, desc="Scanning Videos"): result = process_video(service, video_id, keywords_set) if result: results.append(result) yield from log_status(f" Found mentions in: {result['title']} - {result['video_url']} ({video_id})") yield from log_status("\n6. Formatting Results...") final_md = f""" ## Scan Results for {channel_handle} **Searched Keywords**: {', '.join(keywords_list)} **Videos Found**: {len(results)} out of {len(video_ids)} scanned (filtered from channel total) **Scan Duration**: {time.time() - start_time:.2f} seconds **Filters Applied**: Last {days_filter} days, max {max_videos} videos --- """ final_text = f"Scan Results for {channel_handle}\n\n" final_text += f"Searched Keywords: {', '.join(keywords_list)}\n" final_text += f"Videos Found: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)\n" final_text += f"Scan Duration: {time.time() - start_time:.2f} seconds\n" final_text += f"Filters Applied: Last {days_filter} days, max {max_videos} videos\n\n" if not results: final_md += "\n**No mentions found for the specified keywords.**" final_text += "No mentions found for the specified keywords.\n" else: for res in sorted(results, key=lambda x: x['title']): final_md += f""" ### {res['title']} - **Video URL**: [{res['video_url']}]({res['video_url']}) """ final_text += f"Video: {res['title']}\n" final_text += f"Video URL: {res['video_url']}\n" if res['transcript_mentions']: mentions = ', '.join(sorted(res['transcript_mentions'])) final_md += f"- **Transcript Mentions**: {mentions}\n" final_text += f"Transcript Mentions: {mentions}\n" if res['description_mentions']: mentions = ', '.join(sorted(res['description_mentions'])) final_md += f"- **Description Mentions**: {mentions}\n" final_text += f"Description Mentions: {mentions}\n" if res['description_links']: final_md += f"- **Links in Description**:\n" final_text += f"Links in Description:\n" for link in res['description_links']: final_md += f" - [{link}]({link})\n" final_text += f" - {link}\n" final_md += "\n---\n" final_text += "\n---\n" # Create temporary file with a specific name with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='_youtube_scan_results.txt') as temp_file: temp_file.write(final_text) temp_file_path = temp_file.name yield "\n".join(status_log), gr.Markdown(final_md), gr.File(value=temp_file_path, label="Download Results") except ValueError as ve: yield from log_status(f"Error: {ve}") yield "\n".join(status_log), gr.Markdown(f"### Error\n**Input Error:** {ve}"), None except ConnectionError as ce: yield from log_status(f"Error: {ce}") yield "\n".join(status_log), gr.Markdown(f"### Error\n**API Connection Error:** {ce}"), None except Exception as e: traceback.print_exc() yield from log_status(f"Error: {e}") yield "\n".join(status_log), gr.Markdown(f"### Error\n**Unexpected Error:** {e}"), None # --- Gradio Interface --- with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("# YouTube Keyword Scanner") gr.Markdown("Search for keywords in YouTube video transcripts and descriptions, with customizable filters and downloadable results.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Settings") channel_input = gr.Textbox( label="Channel Handle or ID", placeholder="e.g., @theAIsearch or UCxxxxxxxxxxxxxx", value="@theAIsearch" ) keywords_input = gr.Textbox( label="Keywords (comma-separated)", placeholder="e.g., 3d, blender, maya", value=DEFAULT_KEYWORDS ) days_filter_input = gr.Number( label="Days to Look Back", value=DEFAULT_DAYS, minimum=1, precision=0, info="Filter videos from the last X days" ) max_videos_input = gr.Number( label="Max Videos to Scan", value=DEFAULT_MAX_VIDEOS, minimum=1, precision=0, info="Limit the number of videos scanned" ) submit_button = gr.Button("Submit", variant="primary") clear_button = gr.Button("Clear") with gr.Column(scale=2): gr.Markdown("## Status & Logs") status_output = gr.Textbox( label="Scan Progress", lines=10, max_lines=20, interactive=False, autoscroll=True ) gr.Markdown("## Results") results_output = gr.Markdown(value="Results will appear here.") download_output = gr.File(label="Download Results", visible=False) submit_button.click( fn=scan_channel_videos, inputs=[channel_input, keywords_input, days_filter_input, max_videos_input], outputs=[status_output, results_output, download_output] ) clear_button.click( fn=lambda: ("", "Results cleared.", "", DEFAULT_KEYWORDS, DEFAULT_DAYS, DEFAULT_MAX_VIDEOS, None), inputs=[], outputs=[status_output, results_output, channel_input, keywords_input, days_filter_input, max_videos_input, download_output] ) # --- Run the App --- if __name__ == "__main__": app.launch(debug=False)