Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import gradio as gr | |
import re | |
import sys | |
import shutil | |
# Try to import required packages with error handling | |
try: | |
from yt_dlp import YoutubeDL | |
YT_DLP_AVAILABLE = True | |
except ImportError as e: | |
YT_DLP_AVAILABLE = False | |
print(f"yt-dlp import error: {e}") | |
try: | |
import whisper | |
WHISPER_AVAILABLE = True | |
except ImportError as e: | |
WHISPER_AVAILABLE = False | |
print(f"whisper import error: {e}") | |
print(f"Python version: {sys.version}") | |
print(f"yt-dlp available: {YT_DLP_AVAILABLE}") | |
print(f"whisper available: {WHISPER_AVAILABLE}") | |
def download_audio(url, cookies_file_path=None): | |
"""Download audio from YouTube URL and return the file path""" | |
if not YT_DLP_AVAILABLE: | |
raise Exception("yt-dlp is not available. Please check the installation.") | |
try: | |
# Create a temporary directory for downloads | |
temp_dir = tempfile.mkdtemp() | |
output_path = os.path.join(temp_dir, "audio") | |
ydl_opts = { | |
'format': 'bestaudio[ext=m4a]/bestaudio/best', | |
'outtmpl': output_path + '.%(ext)s', | |
'quiet': True, | |
'no_warnings': True, | |
} | |
# Add cookies file if provided | |
if cookies_file_path and os.path.exists(cookies_file_path): | |
ydl_opts['cookiefile'] = cookies_file_path | |
print(f"Using cookies file: {cookies_file_path}") | |
with YoutubeDL(ydl_opts) as ydl: | |
info_dict = ydl.extract_info(url, download=True) | |
filename = ydl.prepare_filename(info_dict) | |
# Find the downloaded file | |
for ext in ['.m4a', '.webm', '.mp4', '.mp3']: | |
potential_file = output_path + ext | |
if os.path.exists(potential_file): | |
return potential_file | |
raise FileNotFoundError(f"Downloaded audio file not found") | |
except Exception as e: | |
raise Exception(f"Failed to download audio: {str(e)}") | |
def transcribe_audio(file_path): | |
"""Transcribe audio file using Whisper""" | |
if not WHISPER_AVAILABLE: | |
raise Exception("OpenAI Whisper is not available. Please check the installation.") | |
try: | |
# Use the smallest model to reduce memory usage | |
model = whisper.load_model("tiny") | |
result = model.transcribe(file_path) | |
return result["text"] | |
except Exception as e: | |
raise Exception(f"Failed to transcribe audio: {str(e)}") | |
def extract_stock_info_simple(text): | |
"""Extract stock information using simple pattern matching""" | |
try: | |
stock_info = [] | |
# Simple patterns to look for stock-related information | |
stock_patterns = [ | |
r'\b[A-Z]{1,5}\b(?:\s+stock|\s+shares|\s+symbol)', # Stock symbols | |
r'(?:buy|sell|target|price)\s+[A-Z]{1,5}', | |
r'\$\d+(?:\.\d{2})?', # Dollar amounts | |
r'\b(?:bullish|bearish|buy|sell|hold)\b', | |
] | |
# Look for company names and stock mentions | |
companies = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:Inc|Corp|Company|Ltd)\.?)?', text) | |
symbols = re.findall(r'\b[A-Z]{2,5}\b', text) | |
prices = re.findall(r'\$\d+(?:\.\d{2})?', text) | |
actions = re.findall(r'\b(?:buy|sell|hold|bullish|bearish|target|stop\s+loss)\b', text, re.IGNORECASE) | |
# Format the extracted information | |
result = "=== EXTRACTED STOCK INFORMATION ===\n\n" | |
if companies: | |
result += f"π Mentioned Companies: {', '.join(set(companies[:10]))}\n\n" | |
if symbols: | |
result += f"π€ Potential Stock Symbols: {', '.join(set(symbols[:10]))}\n\n" | |
if prices: | |
result += f"π° Price Mentions: {', '.join(set(prices[:10]))}\n\n" | |
if actions: | |
result += f"π Trading Actions: {', '.join(set(actions[:10]))}\n\n" | |
# Look for specific recommendation patterns | |
recommendations = [] | |
sentences = text.split('.') | |
for sentence in sentences: | |
if any(word in sentence.lower() for word in ['buy', 'sell', 'target', 'recommendation']): | |
if any(symbol in sentence for symbol in symbols[:5]): | |
recommendations.append(sentence.strip()) | |
if recommendations: | |
result += "π― Potential Recommendations:\n" | |
for rec in recommendations[:5]: | |
result += f"β’ {rec}\n" | |
if not any([companies, symbols, prices, actions]): | |
result += "β οΈ No clear stock recommendations found in the transcript.\n" | |
result += "This might be because:\n" | |
result += "β’ The video doesn't contain stock recommendations\n" | |
result += "β’ The audio quality was poor\n" | |
result += "β’ The content is not in English\n" | |
return result | |
except Exception as e: | |
return f"Error extracting stock info: {str(e)}" | |
def cleanup_file(file_path): | |
"""Clean up temporary files""" | |
try: | |
if file_path and os.path.exists(file_path): | |
os.remove(file_path) | |
# Also try to remove the directory if it's empty | |
try: | |
os.rmdir(os.path.dirname(file_path)) | |
except: | |
pass | |
except: | |
pass | |
def process_cookies_file(cookies_file): | |
"""Process uploaded cookies file and return the path""" | |
if cookies_file is None: | |
return None | |
try: | |
# Create a temporary file for cookies | |
temp_cookies_path = tempfile.mktemp(suffix='.txt') | |
# Copy the uploaded file to temp location | |
shutil.copy2(cookies_file.name, temp_cookies_path) | |
return temp_cookies_path | |
except Exception as e: | |
print(f"Error processing cookies file: {e}") | |
return None | |
def process_video(url, cookies_file, progress=gr.Progress()): | |
"""Main function to process YouTube video""" | |
# Check if required packages are available | |
if not YT_DLP_AVAILABLE: | |
return "Error: yt-dlp is not installed properly. Please check the requirements.", "", "β Error: Missing yt-dlp" | |
if not WHISPER_AVAILABLE: | |
return "Error: OpenAI Whisper is not installed properly. Please check the requirements.", "", "β Error: Missing Whisper" | |
if not url or not url.strip(): | |
return "Please provide a valid YouTube URL", "", "β Error: Invalid URL" | |
audio_path = None | |
cookies_temp_path = None | |
try: | |
# Validate URL | |
if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']): | |
return "Please provide a valid YouTube URL", "", "β Error: Invalid URL" | |
# Process cookies file if provided | |
progress(0.05, desc="Processing cookies...") | |
cookies_temp_path = process_cookies_file(cookies_file) | |
status_msg = "β Cookies loaded" if cookies_temp_path else "β οΈ No cookies (may encounter bot detection)" | |
# Download audio | |
progress(0.2, desc="Downloading audio...") | |
audio_path = download_audio(url, cookies_temp_path) | |
# Transcribe audio | |
progress(0.6, desc="Transcribing audio...") | |
transcript = transcribe_audio(audio_path) | |
if not transcript.strip(): | |
return "No speech detected in the video", "", "β No speech detected" | |
# Extract stock information | |
progress(0.9, desc="Extracting stock information...") | |
stock_details = extract_stock_info_simple(transcript) | |
progress(1.0, desc="Complete!") | |
return transcript, stock_details, "β Processing completed successfully" | |
except Exception as e: | |
error_msg = f"Error processing video: {str(e)}" | |
return error_msg, "", f"β Error: {str(e)}" | |
finally: | |
# Clean up temporary files | |
cleanup_file(audio_path) | |
cleanup_file(cookies_temp_path) | |
# Create Gradio interface | |
with gr.Blocks( | |
title="Stock Recommendation Extractor", | |
theme=gr.themes.Soft(), | |
css=""" | |
.gradio-container { | |
max-width: 1400px; | |
margin: auto; | |
} | |
.status-box { | |
padding: 10px; | |
border-radius: 5px; | |
margin: 10px 0; | |
} | |
""" | |
) as demo: | |
gr.Markdown(""" | |
# π Stock Recommendation Extractor from YouTube | |
Extract stock recommendations and trading information from YouTube videos using AI transcription. | |
**How it works:** | |
1. Upload your cookies.txt file (optional but recommended to avoid bot detection) | |
2. Paste YouTube video URL | |
3. Downloads audio from YouTube video | |
4. Transcribes using OpenAI Whisper | |
5. Extracts stock-related information | |
**β οΈ Disclaimer:** This is for educational purposes only. Always do your own research! | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# Cookies file upload | |
cookies_input = gr.File( | |
label="πͺ Upload Cookies File (cookies.txt)", | |
file_types=[".txt"], | |
file_count="single" | |
) | |
gr.Markdown(""" | |
**How to get cookies.txt:** | |
1. Install browser extension like "Get cookies.txt LOCALLY" | |
2. Visit YouTube in your browser (logged in) | |
3. Export cookies for youtube.com | |
4. Upload the downloaded cookies.txt file here | |
""") | |
url_input = gr.Textbox( | |
label="πΊ YouTube URL", | |
placeholder="https://www.youtube.com/watch?v=...", | |
lines=2 | |
) | |
process_btn = gr.Button( | |
"π Extract Stock Information", | |
variant="primary", | |
size="lg" | |
) | |
# Status display | |
status_output = gr.Textbox( | |
label="π Status", | |
lines=1, | |
interactive=False | |
) | |
gr.Markdown(""" | |
### π‘ Tips: | |
- Upload cookies.txt to avoid bot detection | |
- Works best with financial YouTube channels | |
- Ensure video has clear audio | |
- English content works best | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
transcript_output = gr.Textbox( | |
label="π Full Transcript", | |
lines=15, | |
max_lines=20, | |
show_copy_button=True | |
) | |
with gr.Column(): | |
stock_info_output = gr.Textbox( | |
label="π Extracted Stock Information", | |
lines=15, | |
max_lines=20, | |
show_copy_button=True | |
) | |
# Event handlers | |
process_btn.click( | |
fn=process_video, | |
inputs=[url_input, cookies_input], | |
outputs=[transcript_output, stock_info_output, status_output], | |
show_progress=True | |
) | |
# Example section | |
gr.Markdown("### π Example URLs (Replace with actual financial videos)") | |
gr.Examples( | |
examples=[ | |
["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], | |
], | |
inputs=[url_input], | |
label="Click to try example" | |
) | |
gr.Markdown(""" | |
### π§ Troubleshooting: | |
- **Bot Detection Error**: Upload your cookies.txt file | |
- **No Audio Found**: Check if video has audio track | |
- **Transcription Failed**: Video might be too long or audio quality poor | |
- **No Stock Info**: Video might not contain financial content | |
""") | |
if __name__ == "__main__": | |
demo.launch() |