search-youtuber / app.py
Deadmon's picture
Update app.py
ab04f33 verified
import os
import re
import gradio as gr
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
import time
import traceback
import tempfile
from datetime import datetime, timedelta
from huggingface_hub import login
# --- Constants ---
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
API_KEY = os.environ.get("YT_API_KEY") # Replace with your actual YouTube Data API key
DEFAULT_KEYWORDS = "3d"
DEFAULT_DAYS = 180 # Default to 6 months
DEFAULT_MAX_VIDEOS = 100 # Default to 100 videos
# --- YouTube API Helper Functions ---
def get_youtube_service():
"""Initializes and returns the YouTube API service."""
try:
return build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=API_KEY, cache_discovery=False)
except HttpError as e:
raise ConnectionError(f"Could not connect to YouTube API: {e}")
def get_channel_id(service, handle):
"""Gets the channel ID from a handle or ID."""
if not handle:
raise ValueError("Channel handle or ID is required.")
if handle.startswith("UC") and len(handle) == 24:
return handle
handle = handle if handle.startswith('@') else f"@{handle}"
try:
search_response = service.search().list(q=handle, part="id", type="channel", maxResults=1).execute()
if not search_response.get("items"):
raise ValueError(f"Channel '{handle}' not found.")
return search_response["items"][0]["id"]["channelId"]
except HttpError as e:
raise ConnectionError(f"API error finding channel ID: {e.content}")
def get_uploads_playlist_id(service, channel_id):
"""Gets the uploads playlist ID."""
try:
response = service.channels().list(id=channel_id, part="contentDetails").execute()
if not response.get("items"):
raise ValueError(f"No channel details for ID '{channel_id}'.")
return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
except HttpError as e:
raise ConnectionError(f"API error getting uploads playlist: {e.content}")
def get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos):
"""Fetches video IDs with pre-filtering by keywords, date, and max limit."""
video_ids = []
next_page_token = None
cutoff_date = (datetime.now() - timedelta(days=days_filter)).isoformat("T") + "Z"
while True:
try:
response = service.playlistItems().list(
playlistId=playlist_id,
part="snippet,contentDetails",
maxResults=50,
pageToken=next_page_token
).execute()
for item in response.get("items", []):
video_id = item["contentDetails"]["videoId"]
snippet = item["snippet"]
title = snippet["title"].lower()
description = snippet.get("description", "").lower()
published_at = snippet["publishedAt"]
if published_at < cutoff_date:
continue
if any(keyword in title or keyword in description for keyword in keywords_set):
video_ids.append(video_id)
if len(video_ids) >= max_videos:
return video_ids[:max_videos]
next_page_token = response.get("nextPageToken")
if not next_page_token:
break
except HttpError as e:
print(f"API Error fetching video IDs: {e.content}")
break
return video_ids[:max_videos]
def process_video(service, video_id, keywords_set):
"""Processes a video for keyword mentions and links."""
video_url = f"https://www.youtube.com/watch?v={video_id}"
result = {
"video_id": video_id,
"video_url": video_url,
"title": f"Video ID: {video_id}",
"transcript_mentions": set(),
"description_mentions": set(),
"description_links": []
}
try:
video_response = service.videos().list(id=video_id, part="snippet").execute()
if video_response.get("items"):
snippet = video_response["items"][0]["snippet"]
result["title"] = snippet.get("title", f"Video ID: {video_id}")
description = snippet.get("description", "").lower()
for keyword in keywords_set:
if keyword in description:
result["description_mentions"].add(keyword)
result["description_links"] = re.findall(r'https?://\S+', snippet.get("description", ""))
except HttpError as e:
print(f"API error getting details for {video_id}: {e.resp.status}")
if not result["description_mentions"]:
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB'])
if transcript:
full_transcript = transcript.fetch()
transcript_text = " ".join(segment['text'] for segment in full_transcript).lower()
for keyword in keywords_set:
if keyword in transcript_text:
result["transcript_mentions"].add(keyword)
except (TranscriptsDisabled, NoTranscriptFound, Exception) as e:
print(f"Error fetching transcript for {video_id}: {type(e).__name__}")
if result["transcript_mentions"] or result["description_mentions"]:
return result
return None
# --- Main Function ---
def scan_channel_videos(channel_handle, keywords_str, days_filter, max_videos, progress=gr.Progress(track_tqdm=True)):
"""Scans a YouTube channel for keyword mentions and links with user-defined filters."""
start_time = time.time()
status_log = []
results = []
def log_status(message):
print(message)
status_log.append(message)
yield "\n".join(status_log), gr.Markdown("### Processing..."), None
try:
yield from log_status("1. Initializing YouTube Service...")
service = get_youtube_service()
yield from log_status(f"2. Finding Channel ID for '{channel_handle}'...")
channel_id = get_channel_id(service, channel_handle)
yield from log_status(f" Found Channel ID: {channel_id}")
yield from log_status(f"3. Fetching Uploads Playlist ID...")
playlist_id = get_uploads_playlist_id(service, channel_id)
yield from log_status(f" Found Playlist ID: {playlist_id}")
keywords_list = [k.strip().lower() for k in keywords_str.split(',') if k.strip()]
if not keywords_list:
raise ValueError("At least one keyword is required.")
keywords_set = set(keywords_list)
days_filter = int(days_filter) if days_filter else DEFAULT_DAYS
max_videos = int(max_videos) if max_videos else DEFAULT_MAX_VIDEOS
if days_filter < 1:
raise ValueError("Days filter must be at least 1.")
if max_videos < 1:
raise ValueError("Max videos must be at least 1.")
yield from log_status(f"4. Fetching Video IDs with filters (last {days_filter} days, max {max_videos} videos)...")
video_ids = get_all_video_ids(service, playlist_id, keywords_set, days_filter, max_videos)
if not video_ids:
yield from log_status(" No videos found matching filters.")
yield "\n".join(status_log), gr.Markdown("### Error\nNo videos found matching filters."), None
return
yield from log_status(f" Found {len(video_ids)} videos after filtering.")
yield from log_status(f"5. Scanning {len(video_ids)} videos for keywords: {', '.join(keywords_list)}...")
for video_id in progress.tqdm(video_ids, desc="Scanning Videos"):
result = process_video(service, video_id, keywords_set)
if result:
results.append(result)
yield from log_status(f" Found mentions in: {result['title']} - {result['video_url']} ({video_id})")
yield from log_status("\n6. Formatting Results...")
final_md = f"""
## Scan Results for {channel_handle}
**Searched Keywords**: {', '.join(keywords_list)}
**Videos Found**: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)
**Scan Duration**: {time.time() - start_time:.2f} seconds
**Filters Applied**: Last {days_filter} days, max {max_videos} videos
---
"""
final_text = f"Scan Results for {channel_handle}\n\n"
final_text += f"Searched Keywords: {', '.join(keywords_list)}\n"
final_text += f"Videos Found: {len(results)} out of {len(video_ids)} scanned (filtered from channel total)\n"
final_text += f"Scan Duration: {time.time() - start_time:.2f} seconds\n"
final_text += f"Filters Applied: Last {days_filter} days, max {max_videos} videos\n\n"
if not results:
final_md += "\n**No mentions found for the specified keywords.**"
final_text += "No mentions found for the specified keywords.\n"
else:
for res in sorted(results, key=lambda x: x['title']):
final_md += f"""
### {res['title']}
- **Video URL**: [{res['video_url']}]({res['video_url']})
"""
final_text += f"Video: {res['title']}\n"
final_text += f"Video URL: {res['video_url']}\n"
if res['transcript_mentions']:
mentions = ', '.join(sorted(res['transcript_mentions']))
final_md += f"- **Transcript Mentions**: {mentions}\n"
final_text += f"Transcript Mentions: {mentions}\n"
if res['description_mentions']:
mentions = ', '.join(sorted(res['description_mentions']))
final_md += f"- **Description Mentions**: {mentions}\n"
final_text += f"Description Mentions: {mentions}\n"
if res['description_links']:
final_md += f"- **Links in Description**:\n"
final_text += f"Links in Description:\n"
for link in res['description_links']:
final_md += f" - [{link}]({link})\n"
final_text += f" - {link}\n"
final_md += "\n---\n"
final_text += "\n---\n"
# Create temporary file with a specific name
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='_youtube_scan_results.txt') as temp_file:
temp_file.write(final_text)
temp_file_path = temp_file.name
yield "\n".join(status_log), gr.Markdown(final_md), gr.File(value=temp_file_path, label="Download Results")
except ValueError as ve:
yield from log_status(f"Error: {ve}")
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Input Error:** {ve}"), None
except ConnectionError as ce:
yield from log_status(f"Error: {ce}")
yield "\n".join(status_log), gr.Markdown(f"### Error\n**API Connection Error:** {ce}"), None
except Exception as e:
traceback.print_exc()
yield from log_status(f"Error: {e}")
yield "\n".join(status_log), gr.Markdown(f"### Error\n**Unexpected Error:** {e}"), None
# --- Gradio Interface ---
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("# YouTube Keyword Scanner")
gr.Markdown("Search for keywords in YouTube video transcripts and descriptions, with customizable filters and downloadable results.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## Settings")
channel_input = gr.Textbox(
label="Channel Handle or ID",
placeholder="e.g., @theAIsearch or UCxxxxxxxxxxxxxx",
value="@theAIsearch"
)
keywords_input = gr.Textbox(
label="Keywords (comma-separated)",
placeholder="e.g., 3d, blender, maya",
value=DEFAULT_KEYWORDS
)
days_filter_input = gr.Number(
label="Days to Look Back",
value=DEFAULT_DAYS,
minimum=1,
precision=0,
info="Filter videos from the last X days"
)
max_videos_input = gr.Number(
label="Max Videos to Scan",
value=DEFAULT_MAX_VIDEOS,
minimum=1,
precision=0,
info="Limit the number of videos scanned"
)
submit_button = gr.Button("Submit", variant="primary")
clear_button = gr.Button("Clear")
with gr.Column(scale=2):
gr.Markdown("## Status & Logs")
status_output = gr.Textbox(
label="Scan Progress",
lines=10,
max_lines=20,
interactive=False,
autoscroll=True
)
gr.Markdown("## Results")
results_output = gr.Markdown(value="Results will appear here.")
download_output = gr.File(label="Download Results", visible=False)
submit_button.click(
fn=scan_channel_videos,
inputs=[channel_input, keywords_input, days_filter_input, max_videos_input],
outputs=[status_output, results_output, download_output]
)
clear_button.click(
fn=lambda: ("", "Results cleared.", "", DEFAULT_KEYWORDS, DEFAULT_DAYS, DEFAULT_MAX_VIDEOS, None),
inputs=[],
outputs=[status_output, results_output, channel_input, keywords_input, days_filter_input, max_videos_input, download_output]
)
# --- Run the App ---
if __name__ == "__main__":
app.launch(debug=False)