import os import gradio as gr import tempfile import shutil import re import traceback from yt_dlp import YoutubeDL # Optional: use OpenAI Whisper if available try: import whisper WHISPER_AVAILABLE = True except: WHISPER_AVAILABLE = False # Download audio from YouTube def download_audio(url, cookies_path=None): try: temp_dir = tempfile.mkdtemp() output_path = os.path.join(temp_dir, "audio") ydl_opts = { 'format': 'bestaudio[ext=m4a]/bestaudio/best', 'outtmpl': output_path + '.%(ext)s', 'quiet': True, 'noplaylist': True, 'cookiefile': cookies_path if cookies_path else None, 'user_agent': 'Mozilla/5.0', 'referer': 'https://www.youtube.com/', 'force_ipv4': True, } with YoutubeDL(ydl_opts) as ydl: ydl.download([url]) for ext in [".m4a", ".webm", ".mp3"]: final_path = output_path + ext if os.path.exists(final_path): return final_path, "✅ Audio downloaded successfully" return None, "❌ Audio file not found" except Exception as e: traceback.print_exc() return None, f"❌ Download error: {str(e)}" # Transcribe using Whisper def transcribe_audio(path): if not WHISPER_AVAILABLE: return "❌ Whisper not available. Please install openai-whisper." try: model = whisper.load_model("tiny.en") result = model.transcribe(path) return result["text"] except Exception as e: traceback.print_exc() return f"❌ Transcription failed: {str(e)}" # Extract stock insights def extract_stock_info(text): try: companies = re.findall(r'\b[A-Z][a-z]+(?: [A-Z][a-z]+)*\b', text) symbols = re.findall(r'\b[A-Z]{2,5}\b', text) prices = re.findall(r'\$\d+(?:\.\d{1,2})?', text) actions = re.findall(r'\b(buy|sell|hold|target|bullish|bearish|stop loss|accumulate|short|take profit|entry|exit)\b', text, re.IGNORECASE) result = "=== STOCK RECOMMENDATION ANALYSIS ===\n\n" if companies: result += f"🏢 Companies Mentioned: {', '.join(set(companies[:10]))}\n" if symbols: result += f"🔠 Symbols: {', '.join(set(symbols[:10]))}\n" if prices: result += f"💲 Prices: {', '.join(set(prices[:10]))}\n" if actions: result += f"📊 Actions: {', '.join(set(actions[:10]))}\n" recommendations = [] for line in text.split("."): if any(word in line.lower() for word in ['buy', 'sell', 'target', 'hold', 'accumulate', 'short', 'entry', 'exit']): recommendations.append(line.strip()) if recommendations: result += "\n🎯 Potential Recommendations:\n" for r in recommendations[:5]: result += f"• {r}\n" if not any([companies, symbols, prices, actions]): result += "\n⚠️ No stock-related insights detected." return result except Exception as e: return f"❌ Stock info extraction failed: {str(e)}" # Save cookies def save_cookies(file): if file is None: return None temp_path = tempfile.mktemp(suffix=".txt") try: if hasattr(file, "read"): with open(temp_path, "wb") as f: f.write(file.read()) else: shutil.copy(file, temp_path) return temp_path except Exception as e: print(f"❌ Failed to handle cookies.txt: {e}") return None # ✅ Trim audio to shorter length (2 minutes) for CPU speed import subprocess def trim_audio(input_path, output_path, duration_sec=120): try: command = [ "ffmpeg", "-y", "-i", input_path, "-t", str(duration_sec), # duration in seconds "-c", "copy", output_path ] subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return output_path except Exception as e: print("❌ Error trimming audio:", e) return input_path # YouTube flow def run_pipeline(url, cookies_file, show_transcript): try: if not WHISPER_AVAILABLE: return "❌ Whisper not installed", "" if not url: return "❌ YouTube URL required", "" cookie_path = save_cookies(cookies_file) audio_path, status = download_audio(url, cookie_path) if not audio_path: return status, "" # ⏱ Trim audio to 2 minutes before transcription trimmed_path = tempfile.mktemp(suffix=".mp3") trim_audio(audio_path, trimmed_path) transcript = transcribe_audio(trimmed_path) if transcript.startswith("❌"): return transcript, "" stock_info = extract_stock_info(transcript) if show_transcript: return "✅ Complete", f"📜 Transcript:\n\n{transcript}\n\n\n{stock_info}" else: return "✅ Complete", stock_info except Exception as e: tb = traceback.format_exc() print(tb) return f"❌ Unhandled Error:\n{tb}", "" # Audio upload flow def run_pipeline_audio(audio_file, show_transcript): try: if not WHISPER_AVAILABLE: return "❌ Whisper not installed", "" if audio_file is None: return "❌ No audio file uploaded", "" # Save uploaded file temp_audio_path = tempfile.mktemp(suffix=os.path.splitext(str(audio_file))[-1]) if hasattr(audio_file, "read"): with open(temp_audio_path, "wb") as f: f.write(audio_file.read()) else: shutil.copy(str(audio_file), temp_audio_path) # ⏱ Trim audio to 2 minutes trimmed_path = tempfile.mktemp(suffix=".mp3") trim_audio(temp_audio_path, trimmed_path) transcript = transcribe_audio(trimmed_path) if transcript.startswith("❌"): return transcript, "" stock_info = extract_stock_info(transcript) if show_transcript: return "✅ Complete", f"📜 Transcript:\n\n{transcript}\n\n\n{stock_info}" else: return "✅ Complete", stock_info except Exception as e: tb = traceback.format_exc() print(tb) return f"❌ Unhandled Error:\n{tb}", "" # Gradio UI with gr.Blocks(title="Stock Insights from YouTube or Audio") as demo: gr.Markdown(""" # 📈 Extract Stock Recommendations from YouTube or Uploaded Audio Upload a YouTube video or audio file. We'll transcribe it using Whisper and extract stock insights. """) with gr.Tab("📺 From YouTube Video"): with gr.Row(): url_input = gr.Textbox(label="🎥 YouTube URL") cookie_input = gr.File(label="cookies.txt (optional)", file_types=[".txt"]) show_transcript_yt = gr.Checkbox(label="Show Transcript", value=False) yt_run_btn = gr.Button("🚀 Extract from YouTube") yt_status = gr.Textbox(label="Status") yt_result = gr.Textbox(label="Transcript & Stock Info", lines=15) yt_run_btn.click(fn=run_pipeline, inputs=[url_input, cookie_input, show_transcript_yt], outputs=[yt_status, yt_result]) with gr.Tab("🎵 From Uploaded Audio"): audio_input = gr.File(label="Upload Audio File", file_types=[".mp3", ".wav", ".m4a", ".webm"]) show_transcript_audio = gr.Checkbox(label="Show Transcript", value=False) audio_run_btn = gr.Button("🚀 Extract from Audio") audio_status = gr.Textbox(label="Status") audio_result = gr.Textbox(label="Transcript & Stock Info", lines=15) audio_run_btn.click(fn=run_pipeline_audio, inputs=[audio_input, show_transcript_audio], outputs=[audio_status, audio_result]) if __name__ == "__main__": demo.launch(debug=True)