Spaces:
Sleeping
Sleeping
| import re | |
| import gradio as gr | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
| import time | |
| import traceback | |
| # --- Constants --- | |
| YOUTUBE_API_SERVICE_NAME = "youtube" | |
| YOUTUBE_API_VERSION = "v3" | |
| API_KEY = "AIzaSyCcxSkhTp6aowcyowuBkHIFTSrl_HJ79J0" # Replace with your actual YouTube Data API key | |
| CHANNEL_HANDLE = "@theAIsearch" | |
| # Common 3D software keywords | |
| KEYWORDS = [ | |
| "3d", "blender", "maya", "3ds max", "cinema 4d", "houdini", "zbrush", "unreal engine", | |
| "unity", "substance painter", "substance designer", "v-ray", "arnold", "rendering", | |
| "texturing", "rigging", "vfx", "cgi", "autodesk", "fusion 360" | |
| ] | |
| # --- YouTube API Helper Functions --- | |
| def get_youtube_service(): | |
| """Initializes and returns the YouTube API service.""" | |
| try: | |
| return build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=API_KEY, cache_discovery=False) | |
| except HttpError as e: | |
| raise ConnectionError(f"Could not connect to YouTube API: {e}") | |
| def get_channel_id(service, handle): | |
| """Gets the channel ID from a handle.""" | |
| try: | |
| search_response = service.search().list(q=handle, part="id", type="channel", maxResults=1).execute() | |
| if not search_response.get("items"): | |
| raise ValueError(f"Channel '{handle}' not found.") | |
| return search_response["items"][0]["id"]["channelId"] | |
| except HttpError as e: | |
| raise ConnectionError(f"API error finding channel ID: {e.content}") | |
| def get_uploads_playlist_id(service, channel_id): | |
| """Gets the uploads playlist ID.""" | |
| try: | |
| response = service.channels().list(id=channel_id, part="contentDetails").execute() | |
| if not response.get("items"): | |
| raise ValueError(f"No channel details for ID '{channel_id}'.") | |
| return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] | |
| except HttpError as e: | |
| raise ConnectionError(f"API error getting uploads playlist: {e.content}") | |
| def get_all_video_ids(service, playlist_id): | |
| """Fetches all video IDs from the uploads playlist.""" | |
| video_ids = [] | |
| next_page_token = None | |
| while True: | |
| try: | |
| response = service.playlistItems().list( | |
| playlistId=playlist_id, | |
| part="contentDetails", | |
| maxResults=50, | |
| pageToken=next_page_token | |
| ).execute() | |
| video_ids.extend(item["contentDetails"]["videoId"] for item in response.get("items", [])) | |
| next_page_token = response.get("nextPageToken") | |
| if not next_page_token: | |
| break | |
| except HttpError as e: | |
| print(f"API Error fetching video IDs: {e.content}") | |
| break | |
| return video_ids | |
| def process_video(service, video_id, keywords_set): | |
| """Processes a video for 3D software mentions and links.""" | |
| video_url = f"https://www.youtube.com/watch?v={video_id}" | |
| result = { | |
| "video_id": video_id, | |
| "video_url": video_url, | |
| "title": f"Video ID: {video_id}", | |
| "transcript_mentions": set(), | |
| "description_mentions": set(), | |
| "description_links": [] | |
| } | |
| # Fetch video details | |
| try: | |
| video_response = service.videos().list(id=video_id, part="snippet").execute() | |
| if video_response.get("items"): | |
| snippet = video_response["items"][0]["snippet"] | |
| result["title"] = snippet.get("title", f"Video ID: {video_id}") | |
| description = snippet.get("description", "").lower() | |
| for keyword in keywords_set: | |
| if keyword in description: | |
| result["description_mentions"].add(keyword) | |
| result["description_links"] = re.findall(r'https?://\S+', snippet.get("description", "")) | |
| except HttpError as e: | |
| print(f"API error getting details for {video_id}: {e.resp.status}") | |
| # Fetch transcript | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB']) | |
| if transcript: | |
| full_transcript = transcript.fetch() | |
| transcript_text = " ".join(segment['text'] for segment in full_transcript).lower() | |
| for keyword in keywords_set: | |
| if keyword in transcript_text: | |
| result["transcript_mentions"].add(keyword) | |
| except (TranscriptsDisabled, NoTranscriptFound): | |
| pass # Skip silently if no transcript | |
| except Exception as e: | |
| print(f"Error fetching transcript for {video_id}: {type(e).__name__}") | |
| # Return result only if there are mentions | |
| if result["transcript_mentions"] or result["description_mentions"]: | |
| return result | |
| return None | |
| # --- Main Function --- | |
| def scan_channel_videos(progress=gr.Progress(track_tqdm=True)): | |
| """Scans @theAIsearch for 3D software mentions and links.""" | |
| start_time = time.time() | |
| status_log = [] | |
| results = [] | |
| def log_status(message): | |
| print(message) | |
| status_log.append(message) | |
| yield "\n".join(status_log), gr.Markdown("### Processing...") | |
| try: | |
| yield from log_status("1. Initializing YouTube Service...") | |
| service = get_youtube_service() | |
| yield from log_status(f"2. Finding Channel ID for '{CHANNEL_HANDLE}'...") | |
| channel_id = get_channel_id(service, CHANNEL_HANDLE) | |
| yield from log_status(f" Found Channel ID: {channel_id}") | |
| yield from log_status(f"3. Fetching Uploads Playlist ID...") | |
| playlist_id = get_uploads_playlist_id(service, channel_id) | |
| yield from log_status(f" Found Playlist ID: {playlist_id}") | |
| yield from log_status("4. Fetching Video IDs...") | |
| video_ids = get_all_video_ids(service, playlist_id) | |
| if not video_ids: | |
| yield from log_status(" No videos found.") | |
| yield "\n".join(status_log), gr.Markdown("### Error\nNo videos found.") | |
| return | |
| yield from log_status(f" Found {len(video_ids)} videos.") | |
| keywords_set = set(k.lower() for k in KEYWORDS) | |
| yield from log_status(f"5. Scanning {len(video_ids)} videos for 3D software mentions...") | |
| for video_id in progress.tqdm(video_ids, desc="Scanning Videos"): | |
| result = process_video(service, video_id, keywords_set) | |
| if result: | |
| results.append(result) | |
| yield from log_status(f" Found mentions in: {result['title']} ({video_id})") | |
| yield from log_status("\n6. Formatting Results...") | |
| final_md = f"## Scan Results for {CHANNEL_HANDLE}\n\n" | |
| final_md += f"Searched for 3D software keywords: `{', '.join(KEYWORDS)}`\n" | |
| final_md += f"Found mentions in **{len(results)}** out of **{len(video_ids)}** videos.\n" | |
| final_md += f"Total time: {time.time() - start_time:.2f} seconds.\n" | |
| final_md += "---\n" | |
| if not results: | |
| final_md += "\n**No mentions of 3D software found.**" | |
| else: | |
| for res in sorted(results, key=lambda x: x['title']): | |
| final_md += f"\n### [{res['title']}]({res['video_url']})\n" | |
| final_md += f"*Video URL: <{res['video_url']}>*\n" | |
| if res['transcript_mentions']: | |
| final_md += f"**Transcript Mentions:** `{', '.join(sorted(res['transcript_mentions']))}`\n" | |
| if res['description_mentions']: | |
| final_md += f"**Description Mentions:** `{', '.join(sorted(res['description_mentions']))}`\n" | |
| if res['description_links']: | |
| final_md += f"**Links in Description:**\n" | |
| for link in res['description_links']: | |
| final_md += f"- <{link}>\n" | |
| final_md += "\n---\n" | |
| yield "\n".join(status_log), gr.Markdown(final_md) | |
| except Exception as e: | |
| traceback.print_exc() | |
| yield from log_status(f"Error: {e}") | |
| yield "\n".join(status_log), gr.Markdown(f"### Error\n**Unexpected Error:** {e}") | |
| # --- Gradio Interface --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# YouTube 3D Software Scanner for @theAIsearch") | |
| gr.Markdown("Finds mentions of 3D software in transcripts and links from video descriptions.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| scan_button = gr.Button("Scan @theAIsearch", variant="primary") | |
| clear_button = gr.Button("Clear") | |
| with gr.Column(scale=2): | |
| gr.Markdown("## Status & Logs") | |
| status_output = gr.Textbox( | |
| label="Scan Progress", | |
| lines=10, | |
| max_lines=20, | |
| interactive=False, | |
| autoscroll=True | |
| ) | |
| gr.Markdown("## Results") | |
| results_output = gr.Markdown(value="Results will appear here.") | |
| scan_button.click(fn=scan_channel_videos, inputs=[], outputs=[status_output, results_output]) | |
| clear_button.click(fn=lambda: ("", "Results cleared."), inputs=[], outputs=[status_output, results_output]) | |
| gr.Markdown("**Note:** Requires a valid YouTube Data API key.") | |
| # --- Run the App --- | |
| if __name__ == "__main__": | |
| app.launch(debug=False) |