developer28's picture
Update app.py
24a0a36 verified
raw
history blame
17.9 kB
import os
import tempfile
import gradio as gr
import re
import sys
# Try to import required packages with error handling
try:
from yt_dlp import YoutubeDL
YT_DLP_AVAILABLE = True
except ImportError as e:
YT_DLP_AVAILABLE = False
print(f"yt-dlp import error: {e}")
try:
import whisper
WHISPER_AVAILABLE = True
except ImportError as e:
WHISPER_AVAILABLE = False
print(f"whisper import error: {e}")
print(f"Python version: {sys.version}")
print(f"yt-dlp available: {YT_DLP_AVAILABLE}")
print(f"whisper available: {WHISPER_AVAILABLE}")
def get_cookies_path():
"""Get the path to cookies.txt file"""
# Check if cookies.txt exists in the current directory
if os.path.exists('cookies.txt'):
return 'cookies.txt'
# Check in the same directory as the script
script_dir = os.path.dirname(os.path.abspath(__file__))
cookies_path = os.path.join(script_dir, 'cookies.txt')
if os.path.exists(cookies_path):
return cookies_path
return None
def download_audio(url):
"""Download audio from YouTube URL and return the file path"""
if not YT_DLP_AVAILABLE:
raise Exception("yt-dlp is not available. Please check the installation.")
try:
# Create a temporary directory for downloads
temp_dir = tempfile.mkdtemp()
output_path = os.path.join(temp_dir, "audio")
# Get cookies path
cookies_path = get_cookies_path()
# Base yt-dlp options
ydl_opts = {
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'outtmpl': output_path + '.%(ext)s',
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'ignoreerrors': False,
# Add user agent to avoid bot detection
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
},
# Add additional options to avoid bot detection
'extractor_retries': 3,
'fragment_retries': 3,
'retry_sleep_functions': {'http': lambda n: min(2 ** n, 30)},
# Add geo bypass options
'geo_bypass': True,
'geo_bypass_country': 'US',
# Add more headers
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip,deflate',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
}
# Add cookies if available
if cookies_path:
ydl_opts['cookiefile'] = cookies_path
print(f"Using cookies from: {cookies_path}")
else:
print("No cookies.txt found - proceeding without cookies")
with YoutubeDL(ydl_opts) as ydl:
try:
# Extract info first to check if video is available
info_dict = ydl.extract_info(url, download=False)
# Check if video is available
if info_dict.get('availability') == 'private':
raise Exception("Video is private and cannot be accessed")
elif info_dict.get('availability') == 'premium_only':
raise Exception("Video requires premium subscription")
elif info_dict.get('availability') == 'subscriber_only':
raise Exception("Video is only available to channel subscribers")
elif info_dict.get('availability') == 'needs_auth':
raise Exception("Video requires authentication - try updating cookies")
elif info_dict.get('live_status') == 'is_live':
raise Exception("Cannot download live streams")
elif info_dict.get('live_status') == 'was_live':
print("Note: This was a live stream, trying to download recorded version...")
# Download the audio
ydl.download([url])
except Exception as extract_error:
# If extract_info fails, try direct download as fallback
print(f"Info extraction failed: {extract_error}")
print("Attempting direct download...")
ydl.download([url])
# Find the downloaded file
for ext in ['.m4a', '.webm', '.mp4', '.mp3']:
potential_file = output_path + ext
if os.path.exists(potential_file):
print(f"Successfully downloaded: {potential_file}")
return potential_file
raise FileNotFoundError(f"Downloaded audio file not found")
except Exception as e:
error_msg = str(e)
if "Sign in to confirm your age" in error_msg:
raise Exception("❌ Video is age-restricted. Please use a different video or update your cookies with an authenticated session.")
elif "Private video" in error_msg:
raise Exception("❌ Video is private and cannot be accessed.")
elif "This video is unavailable" in error_msg or "Video unavailable" in error_msg:
raise Exception("❌ Video is unavailable. This could be due to:\nβ€’ Geographic restrictions\nβ€’ Content removed by uploader\nβ€’ Copyright issues\nβ€’ Try a different video")
elif "This content isn't available" in error_msg:
raise Exception("❌ Content not available in your region or has been restricted. Try:\nβ€’ Using a VPN\nβ€’ Different video\nβ€’ Updating cookies")
elif "blocked" in error_msg.lower():
raise Exception("❌ Access blocked. Try using updated cookies or a different video.")
elif "HTTP Error 403" in error_msg:
raise Exception("❌ Access forbidden. Video may be region-locked or require authentication.")
elif "HTTP Error 404" in error_msg:
raise Exception("❌ Video not found. It may have been deleted or the URL is incorrect.")
else:
raise Exception(f"❌ Download failed: {error_msg}")
def test_video_access(url):
"""Test if a video is accessible without downloading"""
try:
cookies_path = get_cookies_path()
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'skip_download': True,
}
if cookies_path:
ydl_opts['cookiefile'] = cookies_path
with YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=False)
status = "βœ… Video accessible"
details = []
if info_dict.get('title'):
details.append(f"Title: {info_dict['title'][:50]}...")
if info_dict.get('duration'):
details.append(f"Duration: {info_dict['duration']} seconds")
if info_dict.get('availability'):
details.append(f"Availability: {info_dict['availability']}")
if info_dict.get('age_limit'):
details.append(f"Age limit: {info_dict['age_limit']}+")
return status + "\n" + "\n".join(details)
except Exception as e:
return f"❌ Video access test failed: {str(e)}"
def transcribe_audio(file_path):
"""Transcribe audio file using Whisper"""
if not WHISPER_AVAILABLE:
raise Exception("OpenAI Whisper is not available. Please check the installation.")
try:
# Use the smallest model to reduce memory usage
model = whisper.load_model("tiny")
result = model.transcribe(file_path)
return result["text"]
except Exception as e:
raise Exception(f"Failed to transcribe audio: {str(e)}")
def extract_stock_info_simple(text):
"""Extract stock information using simple pattern matching"""
try:
stock_info = []
# Simple patterns to look for stock-related information
stock_patterns = [
r'\b[A-Z]{1,5}\b(?:\s+stock|\s+shares|\s+symbol)', # Stock symbols
r'(?:buy|sell|target|price)\s+[A-Z]{1,5}',
r'\$\d+(?:\.\d{2})?', # Dollar amounts
r'\b(?:bullish|bearish|buy|sell|hold)\b',
]
# Look for company names and stock mentions
companies = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:Inc|Corp|Company|Ltd)\.?)?', text)
symbols = re.findall(r'\b[A-Z]{2,5}\b', text)
prices = re.findall(r'\$\d+(?:\.\d{2})?', text)
actions = re.findall(r'\b(?:buy|sell|hold|bullish|bearish|target|stop\s+loss)\b', text, re.IGNORECASE)
# Format the extracted information
result = "=== EXTRACTED STOCK INFORMATION ===\n\n"
if companies:
result += f"πŸ“Š Mentioned Companies: {', '.join(set(companies[:10]))}\n\n"
if symbols:
result += f"πŸ”€ Potential Stock Symbols: {', '.join(set(symbols[:10]))}\n\n"
if prices:
result += f"πŸ’° Price Mentions: {', '.join(set(prices[:10]))}\n\n"
if actions:
result += f"πŸ“ˆ Trading Actions: {', '.join(set(actions[:10]))}\n\n"
# Look for specific recommendation patterns
recommendations = []
sentences = text.split('.')
for sentence in sentences:
if any(word in sentence.lower() for word in ['buy', 'sell', 'target', 'recommendation']):
if any(symbol in sentence for symbol in symbols[:5]):
recommendations.append(sentence.strip())
if recommendations:
result += "🎯 Potential Recommendations:\n"
for rec in recommendations[:5]:
result += f"β€’ {rec}\n"
if not any([companies, symbols, prices, actions]):
result += "⚠️ No clear stock recommendations found in the transcript.\n"
result += "This might be because:\n"
result += "β€’ The video doesn't contain stock recommendations\n"
result += "β€’ The audio quality was poor\n"
result += "β€’ The content is not in English\n"
return result
except Exception as e:
return f"Error extracting stock info: {str(e)}"
def cleanup_file(file_path):
"""Clean up temporary files"""
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
# Also try to remove the directory if it's empty
try:
os.rmdir(os.path.dirname(file_path))
except:
pass
except:
pass
def system_test():
"""Test system components"""
results = []
# Test yt-dlp
if YT_DLP_AVAILABLE:
results.append("βœ… yt-dlp: Available")
try:
ydl = YoutubeDL({'quiet': True})
results.append("βœ… yt-dlp: Can create YoutubeDL instance")
except Exception as e:
results.append(f"❌ yt-dlp: Cannot create instance - {e}")
else:
results.append("❌ yt-dlp: Not available")
# Test Whisper
if WHISPER_AVAILABLE:
results.append("βœ… Whisper: Available (Type: openai-whisper)")
try:
import whisper
results.append("βœ… Whisper: OpenAI Whisper can be imported")
except Exception as e:
results.append(f"❌ Whisper: Cannot import - {e}")
else:
results.append("❌ Whisper: Not available")
# Test file operations
try:
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(b"test")
temp_file.close()
os.remove(temp_file.name)
results.append("βœ… File operations: Working")
except Exception as e:
results.append(f"❌ File operations: Failed - {e}")
# Test cookies
cookies_path = get_cookies_path()
if cookies_path:
results.append(f"βœ… Cookies: Found at {cookies_path}")
else:
results.append("⚠️ Cookies: Not found (may cause bot detection issues)")
return "\n".join(results)
def process_video(url, progress=gr.Progress()):
"""Main function to process YouTube video"""
# Check if required packages are available
if not YT_DLP_AVAILABLE:
return "Error: yt-dlp is not installed properly. Please check the requirements.", ""
if not WHISPER_AVAILABLE:
return "Error: OpenAI Whisper is not installed properly. Please check the requirements.", ""
if not url or not url.strip():
return "Please provide a valid YouTube URL", ""
audio_path = None
try:
# Validate URL
if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']):
return "Please provide a valid YouTube URL", ""
# Download audio
progress(0.1, desc="Downloading audio...")
audio_path = download_audio(url)
# Transcribe audio
progress(0.5, desc="Transcribing audio...")
transcript = transcribe_audio(audio_path)
if not transcript.strip():
return "No speech detected in the video", ""
# Extract stock information
progress(0.8, desc="Extracting stock information...")
stock_details = extract_stock_info_simple(transcript)
progress(1.0, desc="Complete!")
return transcript, stock_details
except Exception as e:
error_msg = f"Error processing video: {str(e)}"
return error_msg, ""
finally:
# Clean up temporary files
cleanup_file(audio_path)
# Create Gradio interface
with gr.Blocks(
title="Stock Recommendation Extractor",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1200px;
margin: auto;
}
"""
) as demo:
gr.Markdown("""
# πŸ“ˆ Stock Recommendation Extractor from YouTube
Extract stock recommendations and trading information from YouTube videos using AI transcription.
**How it works:**
1. Downloads audio from YouTube video
2. Transcribes using OpenAI Whisper
3. Extracts stock-related information
**⚠️ Disclaimer:** This is for educational purposes only. Always do your own research!
""")
# Add system test section
with gr.Accordion("πŸ§ͺ System Status", open=False):
system_status = gr.Textbox(
value=system_test(),
label="System Test Results",
lines=10,
interactive=False
)
test_btn = gr.Button("πŸ”„ Re-run System Test")
test_btn.click(fn=system_test, outputs=system_status)
with gr.Row():
with gr.Column(scale=1):
url_input = gr.Textbox(
label="πŸ“Ί YouTube URL",
placeholder="https://www.youtube.com/watch?v=...",
lines=2
)
with gr.Row():
process_btn = gr.Button(
"πŸš€ Extract Stock Information",
variant="primary",
size="lg"
)
test_btn = gr.Button(
"πŸ” Test Video Access",
variant="secondary"
)
test_result = gr.Textbox(
label="πŸ“‹ Video Access Test",
lines=4,
visible=False
)
gr.Markdown("""
### πŸ’‘ Tips:
- **First try "Test Video Access"** to check if video is available
- Works best with financial YouTube channels
- Ensure video has clear audio
- English content works best
- If you get bot detection errors, try updating cookies.txt
### 🎯 Recommended Financial Channels:
- Ben Felix, The Plain Bagel, Two Cents, Graham Stephan
- Make sure videos are public and not age-restricted
""")
# Add test button functionality
def test_and_show(url):
if not url:
return "Please enter a YouTube URL first", gr.update(visible=False)
result = test_video_access(url)
return result, gr.update(visible=True)
test_btn.click(
fn=test_and_show,
inputs=[url_input],
outputs=[test_result, test_result]
)
with gr.Row():
with gr.Column():
transcript_output = gr.Textbox(
label="πŸ“ Full Transcript",
lines=15,
max_lines=20,
show_copy_button=True
)
with gr.Column():
stock_info_output = gr.Textbox(
label="πŸ“Š Extracted Stock Information",
lines=15,
max_lines=20,
show_copy_button=True
)
# Event handlers
process_btn.click(
fn=process_video,
inputs=[url_input],
outputs=[transcript_output, stock_info_output],
show_progress=True
)
# Example section
gr.Markdown("### πŸ“‹ Example URLs (Replace with actual financial videos)")
gr.Examples(
examples=[
["https://www.youtube.com/watch?v=dQw4w9WgXcQ"],
],
inputs=[url_input],
label="Click to try example"
)
if __name__ == "__main__":
demo.launch()