Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import gradio as gr | |
import re | |
import sys | |
import shutil | |
import importlib.util | |
import time | |
import random | |
def check_requirements(): | |
"""Check if all required packages are installed and return status""" | |
requirements_status = [] | |
packages = [ | |
('gradio', 'gradio'), | |
('yt-dlp', 'yt_dlp'), | |
('openai-whisper', 'whisper'), | |
('torch', 'torch'), | |
('torchaudio', 'torchaudio'), | |
('numpy', 'numpy'), | |
('regex', 'regex'), | |
] | |
for package_name, import_name in packages: | |
try: | |
spec = importlib.util.find_spec(import_name) | |
if spec is None: | |
requirements_status.append(f"β {package_name}: Not found") | |
continue | |
module = importlib.import_module(import_name) | |
version = getattr(module, '__version__', 'Unknown version') | |
requirements_status.append(f"β {package_name}: {version}") | |
except ImportError as e: | |
requirements_status.append(f"β {package_name}: Import error - {str(e)}") | |
except Exception as e: | |
requirements_status.append(f"β οΈ {package_name}: Found but error - {str(e)}") | |
# Add Python info | |
requirements_status.append(f"\nπ Python: {sys.version}") | |
requirements_status.append(f"π Python executable: {sys.executable}") | |
return "\n".join(requirements_status) | |
# Try to import required packages with error handling | |
try: | |
from yt_dlp import YoutubeDL | |
YT_DLP_AVAILABLE = True | |
except ImportError as e: | |
YT_DLP_AVAILABLE = False | |
print(f"yt-dlp import error: {e}") | |
# Try multiple whisper import methods | |
WHISPER_AVAILABLE = False | |
WHISPER_TYPE = None | |
try: | |
import whisper | |
WHISPER_AVAILABLE = True | |
WHISPER_TYPE = "openai-whisper" | |
print("Using OpenAI Whisper") | |
except ImportError as e: | |
print(f"OpenAI Whisper import error: {e}") | |
try: | |
from transformers import pipeline | |
WHISPER_AVAILABLE = True | |
WHISPER_TYPE = "transformers" | |
print("Using Transformers Whisper") | |
except ImportError as e2: | |
print(f"Transformers Whisper import error: {e2}") | |
print(f"Python version: {sys.version}") | |
print(f"Python executable: {sys.executable}") | |
print(f"yt-dlp available: {YT_DLP_AVAILABLE}") | |
print(f"whisper available: {WHISPER_AVAILABLE} (type: {WHISPER_TYPE})") | |
def get_video_info(url, cookies_file_path=None): | |
"""Get video information without downloading""" | |
if not YT_DLP_AVAILABLE: | |
raise Exception("yt-dlp is not available.") | |
ydl_opts = { | |
'quiet': True, | |
'no_warnings': True, | |
'extract_flat': False, | |
'skip_download': True, | |
} | |
if cookies_file_path and os.path.exists(cookies_file_path): | |
ydl_opts['cookiefile'] = cookies_file_path | |
with YoutubeDL(ydl_opts) as ydl: | |
try: | |
info = ydl.extract_info(url, download=False) | |
return { | |
'title': info.get('title', 'Unknown'), | |
'duration': info.get('duration', 0), | |
'availability': info.get('availability', 'unknown'), | |
'live_status': info.get('live_status', 'unknown'), | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
def download_audio(url, cookies_file_path=None): | |
"""Download audio from YouTube URL with enhanced error handling""" | |
if not YT_DLP_AVAILABLE: | |
raise Exception("yt-dlp is not available. Please check the installation.") | |
try: | |
# First, try to get video info | |
video_info = get_video_info(url, cookies_file_path) | |
if 'error' in video_info: | |
raise Exception(f"Video info error: {video_info['error']}") | |
print(f"Video title: {video_info.get('title', 'Unknown')}") | |
print(f"Video duration: {video_info.get('duration', 0)} seconds") | |
print(f"Video availability: {video_info.get('availability', 'unknown')}") | |
# Create a temporary directory for downloads | |
temp_dir = tempfile.mkdtemp() | |
output_path = os.path.join(temp_dir, "audio") | |
# Enhanced options for better compatibility | |
ydl_opts = { | |
'format': 'bestaudio[ext=m4a]/bestaudio[ext=webm]/bestaudio[ext=mp4]/bestaudio/best', | |
'outtmpl': output_path + '.%(ext)s', | |
'quiet': False, # Enable logging for debugging | |
'no_warnings': False, | |
'extractor_retries': 5, | |
'fragment_retries': 5, | |
'retry_sleep_functions': {'http': lambda n: min(2 ** n, 60)}, | |
'socket_timeout': 30, | |
'http_chunk_size': 10485760, # 10MB chunks | |
'writeinfojson': False, | |
'writesubtitles': False, | |
'writeautomaticsub': False, | |
'geo_bypass': True, | |
'geo_bypass_country': 'US', | |
'extract_flat': False, | |
'ignoreerrors': False, | |
} | |
# Enhanced cookies and headers handling | |
if cookies_file_path and os.path.exists(cookies_file_path): | |
ydl_opts['cookiefile'] = cookies_file_path | |
print(f"β Using cookies file: {cookies_file_path}") | |
else: | |
print("β οΈ No cookies file - using enhanced headers") | |
# Always add enhanced headers | |
ydl_opts.update({ | |
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'referer': 'https://www.youtube.com/', | |
'headers': { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', | |
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', | |
'sec-ch-ua-mobile': '?0', | |
'sec-ch-ua-platform': '"Windows"', | |
} | |
}) | |
# Add random delay to avoid rate limiting | |
time.sleep(random.uniform(1, 3)) | |
with YoutubeDL(ydl_opts) as ydl: | |
print(f"Attempting to download audio from: {url}") | |
info_dict = ydl.extract_info(url, download=True) | |
# Find the downloaded file | |
for ext in ['.m4a', '.webm', '.mp4', '.mp3', '.aac', '.opus']: | |
potential_file = output_path + ext | |
if os.path.exists(potential_file): | |
print(f"β Audio downloaded: {potential_file}") | |
return potential_file | |
# If no file found, list directory contents for debugging | |
print(f"Files in temp directory: {os.listdir(temp_dir)}") | |
raise FileNotFoundError("Downloaded audio file not found") | |
except Exception as e: | |
error_msg = str(e).lower() | |
# Provide specific error messages and solutions | |
if "video unavailable" in error_msg or "content isn't available" in error_msg: | |
raise Exception(f""" | |
β Video Access Error: The video is unavailable or restricted. | |
Possible reasons: | |
β’ Video is private, unlisted, or deleted | |
β’ Video is geo-blocked in your region | |
β’ Video has age restrictions | |
β’ Video requires sign-in to view | |
β’ Copyright restrictions | |
Solutions to try: | |
1. Verify the video URL is correct and accessible | |
2. Try a different public video | |
3. Check if the video works in your browser | |
4. If using a playlist URL, try the direct video URL instead | |
5. For age-restricted videos, ensure cookies are from a logged-in account | |
Original error: {str(e)} | |
""") | |
elif "403" in error_msg or "forbidden" in error_msg: | |
raise Exception(f""" | |
β Access Forbidden (403): YouTube blocked the request. | |
Solutions: | |
1. **Upload fresh cookies.txt file** (most important) | |
2. Get cookies from a logged-in YouTube account | |
3. Try again after a few minutes (rate limiting) | |
4. Use a different network/VPN if possible | |
How to get fresh cookies: | |
β’ Visit YouTube while logged in | |
β’ Use browser extension to export cookies | |
β’ Upload the newest cookies.txt file | |
Original error: {str(e)} | |
""") | |
elif "429" in error_msg or "rate limit" in error_msg: | |
raise Exception(f""" | |
β Rate Limited (429): Too many requests. | |
Solutions: | |
1. Wait 10-15 minutes before trying again | |
2. Upload fresh cookies.txt file | |
3. Try a different video | |
4. Use a different network if possible | |
Original error: {str(e)} | |
""") | |
else: | |
raise Exception(f"Failed to download audio: {str(e)}") | |
def transcribe_audio(file_path): | |
"""Transcribe audio file using Whisper""" | |
if not WHISPER_AVAILABLE: | |
raise Exception("OpenAI Whisper is not available. Please install it using: pip install openai-whisper") | |
try: | |
if WHISPER_TYPE == "openai-whisper": | |
# Use OpenAI Whisper with more robust settings | |
model = whisper.load_model("base") # Use base model for better accuracy | |
result = model.transcribe( | |
file_path, | |
language="en", # Specify English for better performance | |
task="transcribe", | |
verbose=False, | |
fp16=False, # Better compatibility | |
temperature=0.0, # More deterministic | |
) | |
return result["text"] | |
elif WHISPER_TYPE == "transformers": | |
# Use Transformers Whisper | |
from transformers import pipeline | |
transcriber = pipeline( | |
"automatic-speech-recognition", | |
model="openai/whisper-base", | |
device=-1 # Use CPU for better compatibility | |
) | |
result = transcriber(file_path, return_timestamps=False) | |
return result["text"] | |
else: | |
raise Exception("No compatible Whisper installation found") | |
except Exception as e: | |
raise Exception(f"Failed to transcribe audio: {str(e)}") | |
def extract_stock_info_enhanced(text): | |
"""Enhanced stock information extraction with better patterns""" | |
try: | |
stock_info = [] | |
# Enhanced patterns for stock information | |
stock_patterns = { | |
'symbols': r'\b[A-Z]{2,5}\b(?=\s+(?:stock|shares|ticker|symbol|price|target|buy|sell))', | |
'prices': r'\$\d+(?:\.\d{1,2})?(?:\s*(?:per share|each|target|price))?', | |
'percentages': r'\d+(?:\.\d{1,2})?%', | |
'actions': r'\b(?:buy|sell|hold|long|short|bullish|bearish|target|stop loss|take profit|accumulate|distribute)\b', | |
'companies': r'\b[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+){0,2}(?:\s+(?:Inc|Corp|Company|Ltd|LLC)\.?)?', | |
'market_terms': r'\b(?:earnings|revenue|profit|loss|growth|dividend|yield|PE ratio|market cap|volume)\b', | |
} | |
# Extract information | |
symbols = re.findall(stock_patterns['symbols'], text, re.IGNORECASE) | |
prices = re.findall(stock_patterns['prices'], text) | |
percentages = re.findall(stock_patterns['percentages'], text) | |
actions = re.findall(stock_patterns['actions'], text, re.IGNORECASE) | |
companies = re.findall(stock_patterns['companies'], text) | |
market_terms = re.findall(stock_patterns['market_terms'], text, re.IGNORECASE) | |
# Format results | |
result = "=== π EXTRACTED STOCK INFORMATION ===\n\n" | |
if symbols: | |
result += f"π€ **Stock Symbols Found**: {', '.join(set(symbols[:10]))}\n\n" | |
if companies: | |
filtered_companies = [c for c in set(companies) if len(c) > 3 and c.upper() not in ['THE', 'AND', 'FOR', 'WITH']] | |
if filtered_companies: | |
result += f"π’ **Companies Mentioned**: {', '.join(filtered_companies[:10])}\n\n" | |
if prices: | |
result += f"π° **Price Mentions**: {', '.join(set(prices[:10]))}\n\n" | |
if percentages: | |
result += f"π **Percentage Mentions**: {', '.join(set(percentages[:10]))}\n\n" | |
if actions: | |
result += f"π― **Trading Actions**: {', '.join(set(actions[:10]))}\n\n" | |
if market_terms: | |
result += f"π **Market Terms**: {', '.join(set(market_terms[:10]))}\n\n" | |
# Look for recommendation sentences | |
sentences = [s.strip() for s in text.split('.') if s.strip()] | |
recommendations = [] | |
for sentence in sentences: | |
sentence_lower = sentence.lower() | |
if any(action in sentence_lower for action in ['buy', 'sell', 'target', 'recommend', 'suggest']): | |
if any(symbol in sentence for symbol in symbols[:5]) or any(price in sentence for price in prices[:3]): | |
recommendations.append(sentence) | |
if recommendations: | |
result += "π― **Potential Recommendations**:\n" | |
for i, rec in enumerate(recommendations[:5], 1): | |
result += f"{i}. {rec}\n" | |
result += "\n" | |
# Add summary | |
if not any([symbols, prices, actions, recommendations]): | |
result += "β οΈ **No clear stock recommendations found**\n\n" | |
result += "**Possible reasons:**\n" | |
result += "β’ Video doesn't contain stock/financial content\n" | |
result += "β’ Audio quality was poor for transcription\n" | |
result += "β’ Content is not in English\n" | |
result += "β’ General market discussion without specific recommendations\n" | |
else: | |
result += "β **Analysis Complete** - Please verify all information independently!\n" | |
result += "\n" + "="*50 + "\n" | |
result += "β οΈ **DISCLAIMER**: This is automated extraction for educational purposes only.\n" | |
result += "Always conduct your own research before making investment decisions!\n" | |
result += "="*50 | |
return result | |
except Exception as e: | |
return f"β Error extracting stock info: {str(e)}" | |
def cleanup_file(file_path): | |
"""Clean up temporary files""" | |
try: | |
if file_path and os.path.exists(file_path): | |
os.remove(file_path) | |
# Also try to remove the directory if it's empty | |
try: | |
os.rmdir(os.path.dirname(file_path)) | |
except: | |
pass | |
except: | |
pass | |
def process_cookies_file(cookies_file): | |
"""Process uploaded cookies file and return the path""" | |
if cookies_file is None: | |
return None | |
try: | |
# Create a temporary file for cookies | |
temp_cookies_path = tempfile.mktemp(suffix='.txt') | |
# Copy the uploaded file | |
shutil.copy2(cookies_file, temp_cookies_path) | |
# Validate cookies file | |
with open(temp_cookies_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
if 'youtube.com' not in content.lower(): | |
print("β οΈ Warning: cookies file might not contain YouTube cookies") | |
print(f"β Cookies file processed: {temp_cookies_path}") | |
return temp_cookies_path | |
except Exception as e: | |
print(f"β Error processing cookies file: {e}") | |
return None | |
def validate_youtube_url(url): | |
"""Validate YouTube URL format""" | |
if not url or not url.strip(): | |
return False, "Please provide a YouTube URL" | |
url = url.strip() | |
youtube_patterns = [ | |
r'(?:https?://)?(?:www\.)?youtube\.com/watch\?v=[\w-]+', | |
r'(?:https?://)?(?:www\.)?youtu\.be/[\w-]+', | |
r'(?:https?://)?(?:www\.)?youtube\.com/embed/[\w-]+', | |
r'(?:https?://)?(?:m\.)?youtube\.com/watch\?v=[\w-]+', | |
] | |
for pattern in youtube_patterns: | |
if re.match(pattern, url): | |
return True, "Valid YouTube URL" | |
return False, "Invalid YouTube URL format" | |
def process_video(url, cookies_file, progress=gr.Progress()): | |
"""Main function to process YouTube video with detailed debugging""" | |
# Detailed debugging info | |
debug_info = [] | |
debug_info.append(f"π Starting process at {time.strftime('%H:%M:%S')}") | |
debug_info.append(f"π‘ Python version: {sys.version.split()[0]}") | |
debug_info.append(f"π¦ yt-dlp available: {YT_DLP_AVAILABLE}") | |
debug_info.append(f"ποΈ Whisper available: {WHISPER_AVAILABLE} (type: {WHISPER_TYPE})") | |
# Check if required packages are available | |
if not YT_DLP_AVAILABLE: | |
error_msg = "β ERROR: yt-dlp is not installed properly.\n\n" | |
error_msg += "SOLUTION: Install yt-dlp using:\n" | |
error_msg += "pip install yt-dlp\n\n" | |
error_msg += "DEBUG INFO:\n" + "\n".join(debug_info) | |
return error_msg, "", "β Missing yt-dlp" | |
if not WHISPER_AVAILABLE: | |
error_msg = "β ERROR: OpenAI Whisper is not installed properly.\n\n" | |
error_msg += "SOLUTION: Install Whisper using:\n" | |
error_msg += "pip install openai-whisper\n" | |
error_msg += "OR\n" | |
error_msg += "pip install transformers torch torchaudio\n\n" | |
error_msg += "DEBUG INFO:\n" + "\n".join(debug_info) | |
return error_msg, "", "β Missing Whisper" | |
# Validate URL | |
is_valid, validation_msg = validate_youtube_url(url) | |
if not is_valid: | |
error_msg = f"β ERROR: {validation_msg}\n\n" | |
error_msg += f"PROVIDED URL: {url}\n\n" | |
error_msg += "VALID URL FORMATS:\n" | |
error_msg += "β’ https://www.youtube.com/watch?v=VIDEO_ID\n" | |
error_msg += "β’ https://youtu.be/VIDEO_ID\n" | |
error_msg += "β’ https://www.youtube.com/embed/VIDEO_ID\n\n" | |
error_msg += "DEBUG INFO:\n" + "\n".join(debug_info) | |
return error_msg, "", "β Invalid URL" | |
audio_path = None | |
cookies_temp_path = None | |
try: | |
progress(0.05, desc="π Validating URL...") | |
debug_info.append(f"β URL validation passed: {url}") | |
# Process cookies file if provided | |
progress(0.1, desc="πͺ Processing cookies...") | |
cookies_temp_path = process_cookies_file(cookies_file) | |
if cookies_temp_path: | |
debug_info.append(f"β Cookies processed: {cookies_temp_path}") | |
else: | |
debug_info.append("β οΈ No cookies provided - this may cause access errors") | |
status_msg = "β Cookies loaded" if cookies_temp_path else "β οΈ No cookies (may encounter restrictions)" | |
# First, try to get video info for debugging | |
progress(0.15, desc="π Checking video accessibility...") | |
try: | |
video_info = get_video_info(url, cookies_temp_path) | |
if 'error' in video_info: | |
debug_info.append(f"β Video info error: {video_info['error']}") | |
raise Exception(f"Video accessibility check failed: {video_info['error']}") | |
else: | |
debug_info.append(f"β Video info: {video_info}") | |
except Exception as e: | |
debug_info.append(f"β Video info check failed: {str(e)}") | |
# Continue anyway, but log the issue | |
# Download audio | |
progress(0.2, desc="π₯ Downloading audio...") | |
debug_info.append("π Starting audio download...") | |
audio_path = download_audio(url, cookies_temp_path) | |
debug_info.append(f"β Audio downloaded: {audio_path}") | |
# Check if audio file exists and get size | |
if audio_path and os.path.exists(audio_path): | |
file_size = os.path.getsize(audio_path) | |
debug_info.append(f"π Audio file size: {file_size/1024/1024:.2f} MB") | |
else: | |
raise Exception("Audio file not found after download") | |
# Transcribe audio | |
progress(0.6, desc="ποΈ Transcribing audio...") | |
debug_info.append("π Starting transcription...") | |
transcript = transcribe_audio(audio_path) | |
debug_info.append(f"β Transcription completed: {len(transcript)} characters") | |
if not transcript.strip(): | |
error_msg = "β ERROR: No speech detected in the video\n\n" | |
error_msg += "POSSIBLE CAUSES:\n" | |
error_msg += "β’ Video has no audio track\n" | |
error_msg += "β’ Audio is too quiet or unclear\n" | |
error_msg += "β’ Video is not in English\n" | |
error_msg += "β’ Audio file is corrupted\n\n" | |
error_msg += "DEBUG INFO:\n" + "\n".join(debug_info) | |
return error_msg, "", "β No speech detected" | |
# Extract stock information | |
progress(0.9, desc="π Analyzing content...") | |
debug_info.append("π Starting stock analysis...") | |
stock_details = extract_stock_info_enhanced(transcript) | |
debug_info.append("β Stock analysis completed") | |
progress(1.0, desc="β Complete!") | |
# Add debug info to transcript | |
debug_section = "\n\n" + "="*50 + "\n" | |
debug_section += "π DEBUG INFORMATION\n" | |
debug_section += "="*50 + "\n" | |
debug_section += "\n".join(debug_info) | |
return transcript + debug_section, stock_details, "β Processing completed successfully" | |
except Exception as e: | |
error_msg = f"β DETAILED ERROR INFORMATION:\n\n" | |
error_msg += f"ERROR MESSAGE: {str(e)}\n\n" | |
error_msg += f"ERROR TYPE: {type(e).__name__}\n\n" | |
# Add context based on where the error occurred | |
if "download" in str(e).lower(): | |
error_msg += "π§ DOWNLOAD TROUBLESHOOTING:\n" | |
error_msg += "β’ Check if video URL is accessible in browser\n" | |
error_msg += "β’ Upload fresh cookies.txt file\n" | |
error_msg += "β’ Try a different video\n" | |
error_msg += "β’ Wait 10-15 minutes if rate limited\n\n" | |
elif "transcribe" in str(e).lower(): | |
error_msg += "π§ TRANSCRIPTION TROUBLESHOOTING:\n" | |
error_msg += "β’ Check if audio file was downloaded properly\n" | |
error_msg += "β’ Ensure video has clear audio\n" | |
error_msg += "β’ Try a shorter video\n\n" | |
error_msg += "π PROCESSING STEPS COMPLETED:\n" | |
error_msg += "\n".join(debug_info) | |
return error_msg, "", f"β Error: {type(e).__name__}" | |
finally: | |
# Clean up temporary files | |
if audio_path: | |
debug_info.append(f"ποΈ Cleaning up: {audio_path}") | |
cleanup_file(audio_path) | |
if cookies_temp_path: | |
debug_info.append(f"ποΈ Cleaning up: {cookies_temp_path}") | |
cleanup_file(cookies_temp_path) | |
# Create Gradio interface optimized for Gradio Cloud | |
with gr.Blocks( | |
title="π YouTube Stock Extractor", | |
theme=gr.themes.Soft(), | |
css=""" | |
.gradio-container { | |
max-width: 1200px; | |
margin: auto; | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
} | |
.status-box { | |
padding: 12px; | |
border-radius: 8px; | |
margin: 10px 0; | |
border: 1px solid #ddd; | |
} | |
.warning-box { | |
background-color: #fff3cd; | |
border-color: #ffeaa7; | |
color: #856404; | |
} | |
.success-box { | |
background-color: #d4edda; | |
border-color: #c3e6cb; | |
color: #155724; | |
} | |
.error-box { | |
background-color: #f8d7da; | |
border-color: #f5c6cb; | |
color: #721c24; | |
} | |
""" | |
) as demo: | |
gr.Markdown(""" | |
# π YouTube Stock Recommendation Extractor | |
**Extract stock analysis and trading recommendations from YouTube videos using AI** | |
π§ **How it works:** | |
1. **Upload cookies.txt** (essential for avoiding restrictions) | |
2. **Paste YouTube URL** of financial content | |
3. **AI downloads** audio and transcribes using Whisper | |
4. **Extracts** stock symbols, prices, and recommendations | |
β οΈ **Important:** This tool is for educational purposes only. Always do your own research before investing! | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# System check section | |
with gr.Group(): | |
gr.Markdown("### π System Status") | |
check_req_btn = gr.Button( | |
"Check System Requirements", | |
variant="secondary", | |
size="sm" | |
) | |
requirements_output = gr.Textbox( | |
label="π System Requirements Status", | |
lines=8, | |
max_lines=15, | |
interactive=False, | |
visible=False | |
) | |
# Input section | |
with gr.Group(): | |
gr.Markdown("### π₯ Input") | |
# Add a test button first | |
test_btn = gr.Button( | |
"π§ͺ Test System (Click First!)", | |
variant="secondary", | |
size="sm" | |
) | |
test_output = gr.Textbox( | |
label="π§ͺ System Test Results", | |
lines=5, | |
visible=False, | |
interactive=False | |
) | |
# Cookies upload with better instructions | |
cookies_input = gr.File( | |
label="πͺ Upload Cookies File (cookies.txt) - HIGHLY RECOMMENDED", | |
file_types=[".txt"], | |
file_count="single" | |
) | |
with gr.Accordion("π How to Get Cookies (Click to expand)", open=False): | |
gr.Markdown(""" | |
**Why cookies are needed:** YouTube blocks most automated requests without proper authentication. | |
**Step-by-step instructions:** | |
1. **Install browser extension:** | |
- Chrome: "Get cookies.txt LOCALLY" or "cookies.txt" | |
- Firefox: "cookies.txt" or "Export Cookies" | |
2. **Get cookies:** | |
- Visit YouTube.com (log in if needed) | |
- Click the extension icon | |
- Select "Export for youtube.com" | |
- Download the cookies.txt file | |
3. **Upload here:** Use the file upload above | |
**β οΈ Without cookies, you'll get "403 Forbidden" or "Video unavailable" errors** | |
""") | |
url_input = gr.Textbox( | |
label="πΊ YouTube Video URL", | |
placeholder="https://www.youtube.com/watch?v=VIDEO_ID", | |
lines=2, | |
info="Paste the full YouTube video URL here" | |
) | |
process_btn = gr.Button( | |
"π Extract Stock Information", | |
variant="primary", | |
size="lg" | |
) | |
# Status display | |
status_output = gr.Textbox( | |
label="π Status", | |
lines=3, | |
interactive=False, | |
info="Current processing status" | |
) | |
# Output section | |
with gr.Row(): | |
with gr.Column(): | |
transcript_output = gr.Textbox( | |
label="π Full Transcript", | |
lines=20, | |
max_lines=25, | |
show_copy_button=True, | |
info="Complete transcription of the video audio" | |
) | |
with gr.Column(): | |
stock_info_output = gr.Textbox( | |
label="π Extracted Stock Information", | |
lines=20, | |
max_lines=25, | |
show_copy_button=True, | |
info="Parsed stock symbols, prices, and recommendations" | |
) | |
# Example and troubleshooting section | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### π Example URLs") | |
gr.Examples( | |
examples=[ | |
["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], | |
["https://youtu.be/dQw4w9WgXcQ"], | |
], | |
inputs=[url_input], | |
label="Click to try example URLs (replace with actual financial videos)" | |
) | |
# Troubleshooting section | |
with gr.Accordion("π§ Troubleshooting Guide", open=False): | |
gr.Markdown(""" | |
### Common Issues and Solutions: | |
**β "Video unavailable" or "Content isn't available":** | |
- Video might be private, deleted, or geo-blocked | |
- Try a different public financial video | |
- Verify the URL works in your browser | |
- Check if video requires age verification | |
**β "403 Forbidden" error:** | |
- **Upload fresh cookies.txt file** (most common fix) | |
- Make sure cookies are from a logged-in YouTube account | |
- Try waiting 10-15 minutes (rate limiting) | |
**β "No speech detected":** | |
- Video might not have clear audio | |
- Try videos with clear narration | |
- Check if video is in English | |
**β "No stock information found":** | |
- Video might not contain financial content | |
- Try videos from financial YouTube channels | |
- Look for videos with stock analysis or recommendations | |
### Installation Commands: | |
```bash | |
# Install all requirements | |
pip install gradio yt-dlp openai-whisper torch torchaudio | |
# Alternative whisper installation | |
pip install transformers torch torchaudio | |
``` | |
### Best Practices: | |
- Use videos from reputable financial channels | |
- Prefer videos under 20 minutes for faster processing | |
- Ensure clear audio quality | |
- Always verify extracted information independently | |
""") | |
# Event handlers | |
def show_requirements(): | |
status = check_requirements() | |
return gr.update(value=status, visible=True) | |
def test_system(): | |
"""Test system components and return detailed status""" | |
test_results = [] | |
test_results.append("π§ͺ SYSTEM TEST RESULTS") | |
test_results.append("="*30) | |
# Test imports | |
test_results.append(f"β yt-dlp: {'Available' if YT_DLP_AVAILABLE else 'NOT AVAILABLE'}") | |
test_results.append(f"β Whisper: {'Available' if WHISPER_AVAILABLE else 'NOT AVAILABLE'} (Type: {WHISPER_TYPE})") | |
# Test yt-dlp functionality | |
if YT_DLP_AVAILABLE: | |
try: | |
from yt_dlp import YoutubeDL | |
test_ydl = YoutubeDL({'quiet': True}) | |
test_results.append("β yt-dlp: Can create YoutubeDL instance") | |
except Exception as e: | |
test_results.append(f"β yt-dlp: Error creating instance - {str(e)}") | |
# Test Whisper functionality | |
if WHISPER_AVAILABLE: | |
try: | |
if WHISPER_TYPE == "openai-whisper": | |
import whisper | |
test_results.append("β Whisper: OpenAI Whisper can be imported") | |
elif WHISPER_TYPE == "transformers": | |
from transformers import pipeline | |
test_results.append("β Whisper: Transformers Whisper can be imported") | |
except Exception as e: | |
test_results.append(f"β Whisper: Error testing - {str(e)}") | |
# Test file operations | |
try: | |
temp_file = tempfile.mktemp() | |
with open(temp_file, 'w') as f: | |
f.write("test") | |
os.remove(temp_file) | |
test_results.append("β File operations: Working") | |
except Exception as e: | |
test_results.append(f"β File operations: Error - {str(e)}") | |
test_results.append("\nπ‘ If you see any β errors above, install missing packages:") | |
test_results.append("pip install yt-dlp openai-whisper torch torchaudio") | |
return gr.update(value="\n".join(test_results), visible=True) | |
check_req_btn.click( | |
fn=show_requirements, | |
outputs=[requirements_output] | |
) | |
test_btn.click( | |
fn=test_system, | |
outputs=[test_output] | |
) | |
process_btn.click( | |
fn=process_video, | |
inputs=[url_input, cookies_input], | |
outputs=[transcript_output, stock_info_output, status_output], | |
show_progress=True | |
) | |
# Footer | |
gr.Markdown(""" | |
--- | |
**π’ Disclaimer:** This tool is for educational and research purposes only. | |
The extracted information should not be considered as financial advice. | |
Always conduct your own research and consult with financial professionals before making investment decisions. | |
""") | |
# Launch configuration for Gradio Cloud | |
if __name__ == "__main__": | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False, | |
show_error=True, | |
quiet=False | |
) |