developer28's picture
Update app.py
9518e76 verified
raw
history blame
13.4 kB
import os
import tempfile
import gradio as gr
import re
import sys
# Try to import required packages with error handling
try:
from yt_dlp import YoutubeDL
YT_DLP_AVAILABLE = True
except ImportError as e:
YT_DLP_AVAILABLE = False
print(f"yt-dlp import error: {e}")
try:
import whisper
WHISPER_AVAILABLE = True
except ImportError as e:
WHISPER_AVAILABLE = False
print(f"whisper import error: {e}")
print(f"Python version: {sys.version}")
print(f"yt-dlp available: {YT_DLP_AVAILABLE}")
print(f"whisper available: {WHISPER_AVAILABLE}")
def get_cookies_path():
"""Get the path to cookies.txt file"""
# Check if cookies.txt exists in the current directory
if os.path.exists('cookies.txt'):
return 'cookies.txt'
# Check in the same directory as the script
script_dir = os.path.dirname(os.path.abspath(__file__))
cookies_path = os.path.join(script_dir, 'cookies.txt')
if os.path.exists(cookies_path):
return cookies_path
return None
def download_audio(url):
"""Download audio from YouTube URL and return the file path"""
if not YT_DLP_AVAILABLE:
raise Exception("yt-dlp is not available. Please check the installation.")
try:
# Create a temporary directory for downloads
temp_dir = tempfile.mkdtemp()
output_path = os.path.join(temp_dir, "audio")
# Get cookies path
cookies_path = get_cookies_path()
# Base yt-dlp options
ydl_opts = {
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'outtmpl': output_path + '.%(ext)s',
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'ignoreerrors': False,
# Add user agent to avoid bot detection
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
},
# Add additional options to avoid bot detection
'extractor_retries': 3,
'fragment_retries': 3,
'retry_sleep_functions': {'http': lambda n: 2 ** n},
}
# Add cookies if available
if cookies_path:
ydl_opts['cookiefile'] = cookies_path
print(f"Using cookies from: {cookies_path}")
else:
print("No cookies.txt found - proceeding without cookies")
with YoutubeDL(ydl_opts) as ydl:
# Extract info first to check if video is available
info_dict = ydl.extract_info(url, download=False)
# Check if video is available
if info_dict.get('availability') == 'private':
raise Exception("Video is private")
elif info_dict.get('availability') == 'premium_only':
raise Exception("Video requires premium subscription")
elif info_dict.get('live_status') == 'is_live':
raise Exception("Cannot download live streams")
# Download the audio
ydl.download([url])
# Find the downloaded file
for ext in ['.m4a', '.webm', '.mp4', '.mp3']:
potential_file = output_path + ext
if os.path.exists(potential_file):
print(f"Successfully downloaded: {potential_file}")
return potential_file
raise FileNotFoundError(f"Downloaded audio file not found")
except Exception as e:
error_msg = str(e)
if "Sign in to confirm your age" in error_msg:
raise Exception("Video is age-restricted. Please use a different video or update your cookies.")
elif "Private video" in error_msg:
raise Exception("Video is private and cannot be accessed.")
elif "This video is unavailable" in error_msg:
raise Exception("Video is unavailable or has been removed.")
elif "blocked" in error_msg.lower():
raise Exception("Access to this video is blocked. Try using updated cookies or a different video.")
else:
raise Exception(f"Failed to download audio: {error_msg}")
def transcribe_audio(file_path):
"""Transcribe audio file using Whisper"""
if not WHISPER_AVAILABLE:
raise Exception("OpenAI Whisper is not available. Please check the installation.")
try:
# Use the smallest model to reduce memory usage
model = whisper.load_model("tiny")
result = model.transcribe(file_path)
return result["text"]
except Exception as e:
raise Exception(f"Failed to transcribe audio: {str(e)}")
def extract_stock_info_simple(text):
"""Extract stock information using simple pattern matching"""
try:
stock_info = []
# Simple patterns to look for stock-related information
stock_patterns = [
r'\b[A-Z]{1,5}\b(?:\s+stock|\s+shares|\s+symbol)', # Stock symbols
r'(?:buy|sell|target|price)\s+[A-Z]{1,5}',
r'\$\d+(?:\.\d{2})?', # Dollar amounts
r'\b(?:bullish|bearish|buy|sell|hold)\b',
]
# Look for company names and stock mentions
companies = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:Inc|Corp|Company|Ltd)\.?)?', text)
symbols = re.findall(r'\b[A-Z]{2,5}\b', text)
prices = re.findall(r'\$\d+(?:\.\d{2})?', text)
actions = re.findall(r'\b(?:buy|sell|hold|bullish|bearish|target|stop\s+loss)\b', text, re.IGNORECASE)
# Format the extracted information
result = "=== EXTRACTED STOCK INFORMATION ===\n\n"
if companies:
result += f"πŸ“Š Mentioned Companies: {', '.join(set(companies[:10]))}\n\n"
if symbols:
result += f"πŸ”€ Potential Stock Symbols: {', '.join(set(symbols[:10]))}\n\n"
if prices:
result += f"πŸ’° Price Mentions: {', '.join(set(prices[:10]))}\n\n"
if actions:
result += f"πŸ“ˆ Trading Actions: {', '.join(set(actions[:10]))}\n\n"
# Look for specific recommendation patterns
recommendations = []
sentences = text.split('.')
for sentence in sentences:
if any(word in sentence.lower() for word in ['buy', 'sell', 'target', 'recommendation']):
if any(symbol in sentence for symbol in symbols[:5]):
recommendations.append(sentence.strip())
if recommendations:
result += "🎯 Potential Recommendations:\n"
for rec in recommendations[:5]:
result += f"β€’ {rec}\n"
if not any([companies, symbols, prices, actions]):
result += "⚠️ No clear stock recommendations found in the transcript.\n"
result += "This might be because:\n"
result += "β€’ The video doesn't contain stock recommendations\n"
result += "β€’ The audio quality was poor\n"
result += "β€’ The content is not in English\n"
return result
except Exception as e:
return f"Error extracting stock info: {str(e)}"
def cleanup_file(file_path):
"""Clean up temporary files"""
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
# Also try to remove the directory if it's empty
try:
os.rmdir(os.path.dirname(file_path))
except:
pass
except:
pass
def system_test():
"""Test system components"""
results = []
# Test yt-dlp
if YT_DLP_AVAILABLE:
results.append("βœ… yt-dlp: Available")
try:
ydl = YoutubeDL({'quiet': True})
results.append("βœ… yt-dlp: Can create YoutubeDL instance")
except Exception as e:
results.append(f"❌ yt-dlp: Cannot create instance - {e}")
else:
results.append("❌ yt-dlp: Not available")
# Test Whisper
if WHISPER_AVAILABLE:
results.append("βœ… Whisper: Available (Type: openai-whisper)")
try:
import whisper
results.append("βœ… Whisper: OpenAI Whisper can be imported")
except Exception as e:
results.append(f"❌ Whisper: Cannot import - {e}")
else:
results.append("❌ Whisper: Not available")
# Test file operations
try:
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(b"test")
temp_file.close()
os.remove(temp_file.name)
results.append("βœ… File operations: Working")
except Exception as e:
results.append(f"❌ File operations: Failed - {e}")
# Test cookies
cookies_path = get_cookies_path()
if cookies_path:
results.append(f"βœ… Cookies: Found at {cookies_path}")
else:
results.append("⚠️ Cookies: Not found (may cause bot detection issues)")
return "\n".join(results)
def process_video(url, progress=gr.Progress()):
"""Main function to process YouTube video"""
# Check if required packages are available
if not YT_DLP_AVAILABLE:
return "Error: yt-dlp is not installed properly. Please check the requirements.", ""
if not WHISPER_AVAILABLE:
return "Error: OpenAI Whisper is not installed properly. Please check the requirements.", ""
if not url or not url.strip():
return "Please provide a valid YouTube URL", ""
audio_path = None
try:
# Validate URL
if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']):
return "Please provide a valid YouTube URL", ""
# Download audio
progress(0.1, desc="Downloading audio...")
audio_path = download_audio(url)
# Transcribe audio
progress(0.5, desc="Transcribing audio...")
transcript = transcribe_audio(audio_path)
if not transcript.strip():
return "No speech detected in the video", ""
# Extract stock information
progress(0.8, desc="Extracting stock information...")
stock_details = extract_stock_info_simple(transcript)
progress(1.0, desc="Complete!")
return transcript, stock_details
except Exception as e:
error_msg = f"Error processing video: {str(e)}"
return error_msg, ""
finally:
# Clean up temporary files
cleanup_file(audio_path)
# Create Gradio interface
with gr.Blocks(
title="Stock Recommendation Extractor",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1200px;
margin: auto;
}
"""
) as demo:
gr.Markdown("""
# πŸ“ˆ Stock Recommendation Extractor from YouTube
Extract stock recommendations and trading information from YouTube videos using AI transcription.
**How it works:**
1. Downloads audio from YouTube video
2. Transcribes using OpenAI Whisper
3. Extracts stock-related information
**⚠️ Disclaimer:** This is for educational purposes only. Always do your own research!
""")
# Add system test section
with gr.Accordion("πŸ§ͺ System Status", open=False):
system_status = gr.Textbox(
value=system_test(),
label="System Test Results",
lines=10,
interactive=False
)
test_btn = gr.Button("πŸ”„ Re-run System Test")
test_btn.click(fn=system_test, outputs=system_status)
with gr.Row():
with gr.Column(scale=1):
url_input = gr.Textbox(
label="πŸ“Ί YouTube URL",
placeholder="https://www.youtube.com/watch?v=...",
lines=2
)
process_btn = gr.Button(
"πŸš€ Extract Stock Information",
variant="primary",
size="lg"
)
gr.Markdown("""
### πŸ’‘ Tips:
- Works best with financial YouTube channels
- Ensure video has clear audio
- English content works best
- If you get bot detection errors, try updating cookies.txt
""")
with gr.Row():
with gr.Column():
transcript_output = gr.Textbox(
label="πŸ“ Full Transcript",
lines=15,
max_lines=20,
show_copy_button=True
)
with gr.Column():
stock_info_output = gr.Textbox(
label="πŸ“Š Extracted Stock Information",
lines=15,
max_lines=20,
show_copy_button=True
)
# Event handlers
process_btn.click(
fn=process_video,
inputs=[url_input],
outputs=[transcript_output, stock_info_output],
show_progress=True
)
# Example section
gr.Markdown("### πŸ“‹ Example URLs (Replace with actual financial videos)")
gr.Examples(
examples=[
["https://www.youtube.com/watch?v=dQw4w9WgXcQ"],
],
inputs=[url_input],
label="Click to try example"
)
if __name__ == "__main__":
demo.launch()