Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import gradio as gr | |
import re | |
import sys | |
# Try to import required packages with error handling | |
try: | |
from yt_dlp import YoutubeDL | |
YT_DLP_AVAILABLE = True | |
except ImportError as e: | |
YT_DLP_AVAILABLE = False | |
print(f"yt-dlp import error: {e}") | |
try: | |
import whisper | |
WHISPER_AVAILABLE = True | |
except ImportError as e: | |
WHISPER_AVAILABLE = False | |
print(f"whisper import error: {e}") | |
print(f"Python version: {sys.version}") | |
print(f"yt-dlp available: {YT_DLP_AVAILABLE}") | |
print(f"whisper available: {WHISPER_AVAILABLE}") | |
def get_cookies_path(): | |
"""Get the path to cookies.txt file""" | |
# Check if cookies.txt exists in the current directory | |
if os.path.exists('cookies.txt'): | |
return 'cookies.txt' | |
# Check in the same directory as the script | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
cookies_path = os.path.join(script_dir, 'cookies.txt') | |
if os.path.exists(cookies_path): | |
return cookies_path | |
return None | |
def download_audio(url): | |
"""Download audio from YouTube URL and return the file path""" | |
if not YT_DLP_AVAILABLE: | |
raise Exception("yt-dlp is not available. Please check the installation.") | |
try: | |
# Create a temporary directory for downloads | |
temp_dir = tempfile.mkdtemp() | |
output_path = os.path.join(temp_dir, "audio") | |
# Get cookies path | |
cookies_path = get_cookies_path() | |
# Base yt-dlp options | |
ydl_opts = { | |
'format': 'bestaudio[ext=m4a]/bestaudio/best', | |
'outtmpl': output_path + '.%(ext)s', | |
'quiet': True, | |
'no_warnings': True, | |
'extract_flat': False, | |
'ignoreerrors': False, | |
# Add user agent to avoid bot detection | |
'http_headers': { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}, | |
# Add additional options to avoid bot detection | |
'extractor_retries': 3, | |
'fragment_retries': 3, | |
'retry_sleep_functions': {'http': lambda n: 2 ** n}, | |
} | |
# Add cookies if available | |
if cookies_path: | |
ydl_opts['cookiefile'] = cookies_path | |
print(f"Using cookies from: {cookies_path}") | |
else: | |
print("No cookies.txt found - proceeding without cookies") | |
with YoutubeDL(ydl_opts) as ydl: | |
# Extract info first to check if video is available | |
info_dict = ydl.extract_info(url, download=False) | |
# Check if video is available | |
if info_dict.get('availability') == 'private': | |
raise Exception("Video is private") | |
elif info_dict.get('availability') == 'premium_only': | |
raise Exception("Video requires premium subscription") | |
elif info_dict.get('live_status') == 'is_live': | |
raise Exception("Cannot download live streams") | |
# Download the audio | |
ydl.download([url]) | |
# Find the downloaded file | |
for ext in ['.m4a', '.webm', '.mp4', '.mp3']: | |
potential_file = output_path + ext | |
if os.path.exists(potential_file): | |
print(f"Successfully downloaded: {potential_file}") | |
return potential_file | |
raise FileNotFoundError(f"Downloaded audio file not found") | |
except Exception as e: | |
error_msg = str(e) | |
if "Sign in to confirm your age" in error_msg: | |
raise Exception("Video is age-restricted. Please use a different video or update your cookies.") | |
elif "Private video" in error_msg: | |
raise Exception("Video is private and cannot be accessed.") | |
elif "This video is unavailable" in error_msg: | |
raise Exception("Video is unavailable or has been removed.") | |
elif "blocked" in error_msg.lower(): | |
raise Exception("Access to this video is blocked. Try using updated cookies or a different video.") | |
else: | |
raise Exception(f"Failed to download audio: {error_msg}") | |
def transcribe_audio(file_path): | |
"""Transcribe audio file using Whisper""" | |
if not WHISPER_AVAILABLE: | |
raise Exception("OpenAI Whisper is not available. Please check the installation.") | |
try: | |
# Use the smallest model to reduce memory usage | |
model = whisper.load_model("tiny") | |
result = model.transcribe(file_path) | |
return result["text"] | |
except Exception as e: | |
raise Exception(f"Failed to transcribe audio: {str(e)}") | |
def extract_stock_info_simple(text): | |
"""Extract stock information using simple pattern matching""" | |
try: | |
stock_info = [] | |
# Simple patterns to look for stock-related information | |
stock_patterns = [ | |
r'\b[A-Z]{1,5}\b(?:\s+stock|\s+shares|\s+symbol)', # Stock symbols | |
r'(?:buy|sell|target|price)\s+[A-Z]{1,5}', | |
r'\$\d+(?:\.\d{2})?', # Dollar amounts | |
r'\b(?:bullish|bearish|buy|sell|hold)\b', | |
] | |
# Look for company names and stock mentions | |
companies = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:Inc|Corp|Company|Ltd)\.?)?', text) | |
symbols = re.findall(r'\b[A-Z]{2,5}\b', text) | |
prices = re.findall(r'\$\d+(?:\.\d{2})?', text) | |
actions = re.findall(r'\b(?:buy|sell|hold|bullish|bearish|target|stop\s+loss)\b', text, re.IGNORECASE) | |
# Format the extracted information | |
result = "=== EXTRACTED STOCK INFORMATION ===\n\n" | |
if companies: | |
result += f"π Mentioned Companies: {', '.join(set(companies[:10]))}\n\n" | |
if symbols: | |
result += f"π€ Potential Stock Symbols: {', '.join(set(symbols[:10]))}\n\n" | |
if prices: | |
result += f"π° Price Mentions: {', '.join(set(prices[:10]))}\n\n" | |
if actions: | |
result += f"π Trading Actions: {', '.join(set(actions[:10]))}\n\n" | |
# Look for specific recommendation patterns | |
recommendations = [] | |
sentences = text.split('.') | |
for sentence in sentences: | |
if any(word in sentence.lower() for word in ['buy', 'sell', 'target', 'recommendation']): | |
if any(symbol in sentence for symbol in symbols[:5]): | |
recommendations.append(sentence.strip()) | |
if recommendations: | |
result += "π― Potential Recommendations:\n" | |
for rec in recommendations[:5]: | |
result += f"β’ {rec}\n" | |
if not any([companies, symbols, prices, actions]): | |
result += "β οΈ No clear stock recommendations found in the transcript.\n" | |
result += "This might be because:\n" | |
result += "β’ The video doesn't contain stock recommendations\n" | |
result += "β’ The audio quality was poor\n" | |
result += "β’ The content is not in English\n" | |
return result | |
except Exception as e: | |
return f"Error extracting stock info: {str(e)}" | |
def cleanup_file(file_path): | |
"""Clean up temporary files""" | |
try: | |
if file_path and os.path.exists(file_path): | |
os.remove(file_path) | |
# Also try to remove the directory if it's empty | |
try: | |
os.rmdir(os.path.dirname(file_path)) | |
except: | |
pass | |
except: | |
pass | |
def system_test(): | |
"""Test system components""" | |
results = [] | |
# Test yt-dlp | |
if YT_DLP_AVAILABLE: | |
results.append("β yt-dlp: Available") | |
try: | |
ydl = YoutubeDL({'quiet': True}) | |
results.append("β yt-dlp: Can create YoutubeDL instance") | |
except Exception as e: | |
results.append(f"β yt-dlp: Cannot create instance - {e}") | |
else: | |
results.append("β yt-dlp: Not available") | |
# Test Whisper | |
if WHISPER_AVAILABLE: | |
results.append("β Whisper: Available (Type: openai-whisper)") | |
try: | |
import whisper | |
results.append("β Whisper: OpenAI Whisper can be imported") | |
except Exception as e: | |
results.append(f"β Whisper: Cannot import - {e}") | |
else: | |
results.append("β Whisper: Not available") | |
# Test file operations | |
try: | |
temp_file = tempfile.NamedTemporaryFile(delete=False) | |
temp_file.write(b"test") | |
temp_file.close() | |
os.remove(temp_file.name) | |
results.append("β File operations: Working") | |
except Exception as e: | |
results.append(f"β File operations: Failed - {e}") | |
# Test cookies | |
cookies_path = get_cookies_path() | |
if cookies_path: | |
results.append(f"β Cookies: Found at {cookies_path}") | |
else: | |
results.append("β οΈ Cookies: Not found (may cause bot detection issues)") | |
return "\n".join(results) | |
def process_video(url, progress=gr.Progress()): | |
"""Main function to process YouTube video""" | |
# Check if required packages are available | |
if not YT_DLP_AVAILABLE: | |
return "Error: yt-dlp is not installed properly. Please check the requirements.", "" | |
if not WHISPER_AVAILABLE: | |
return "Error: OpenAI Whisper is not installed properly. Please check the requirements.", "" | |
if not url or not url.strip(): | |
return "Please provide a valid YouTube URL", "" | |
audio_path = None | |
try: | |
# Validate URL | |
if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']): | |
return "Please provide a valid YouTube URL", "" | |
# Download audio | |
progress(0.1, desc="Downloading audio...") | |
audio_path = download_audio(url) | |
# Transcribe audio | |
progress(0.5, desc="Transcribing audio...") | |
transcript = transcribe_audio(audio_path) | |
if not transcript.strip(): | |
return "No speech detected in the video", "" | |
# Extract stock information | |
progress(0.8, desc="Extracting stock information...") | |
stock_details = extract_stock_info_simple(transcript) | |
progress(1.0, desc="Complete!") | |
return transcript, stock_details | |
except Exception as e: | |
error_msg = f"Error processing video: {str(e)}" | |
return error_msg, "" | |
finally: | |
# Clean up temporary files | |
cleanup_file(audio_path) | |
# Create Gradio interface | |
with gr.Blocks( | |
title="Stock Recommendation Extractor", | |
theme=gr.themes.Soft(), | |
css=""" | |
.gradio-container { | |
max-width: 1200px; | |
margin: auto; | |
} | |
""" | |
) as demo: | |
gr.Markdown(""" | |
# π Stock Recommendation Extractor from YouTube | |
Extract stock recommendations and trading information from YouTube videos using AI transcription. | |
**How it works:** | |
1. Downloads audio from YouTube video | |
2. Transcribes using OpenAI Whisper | |
3. Extracts stock-related information | |
**β οΈ Disclaimer:** This is for educational purposes only. Always do your own research! | |
""") | |
# Add system test section | |
with gr.Accordion("π§ͺ System Status", open=False): | |
system_status = gr.Textbox( | |
value=system_test(), | |
label="System Test Results", | |
lines=10, | |
interactive=False | |
) | |
test_btn = gr.Button("π Re-run System Test") | |
test_btn.click(fn=system_test, outputs=system_status) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
url_input = gr.Textbox( | |
label="πΊ YouTube URL", | |
placeholder="https://www.youtube.com/watch?v=...", | |
lines=2 | |
) | |
process_btn = gr.Button( | |
"π Extract Stock Information", | |
variant="primary", | |
size="lg" | |
) | |
gr.Markdown(""" | |
### π‘ Tips: | |
- Works best with financial YouTube channels | |
- Ensure video has clear audio | |
- English content works best | |
- If you get bot detection errors, try updating cookies.txt | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
transcript_output = gr.Textbox( | |
label="π Full Transcript", | |
lines=15, | |
max_lines=20, | |
show_copy_button=True | |
) | |
with gr.Column(): | |
stock_info_output = gr.Textbox( | |
label="π Extracted Stock Information", | |
lines=15, | |
max_lines=20, | |
show_copy_button=True | |
) | |
# Event handlers | |
process_btn.click( | |
fn=process_video, | |
inputs=[url_input], | |
outputs=[transcript_output, stock_info_output], | |
show_progress=True | |
) | |
# Example section | |
gr.Markdown("### π Example URLs (Replace with actual financial videos)") | |
gr.Examples( | |
examples=[ | |
["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], | |
], | |
inputs=[url_input], | |
label="Click to try example" | |
) | |
if __name__ == "__main__": | |
demo.launch() |