developer28's picture
Update app.py
5a12060 verified
import os
import gradio as gr
import tempfile
import shutil
import re
import traceback
from yt_dlp import YoutubeDL
# Optional: use OpenAI Whisper if available
try:
import whisper
WHISPER_AVAILABLE = True
except:
WHISPER_AVAILABLE = False
# Download audio from YouTube
def download_audio(url, cookies_path=None):
try:
temp_dir = tempfile.mkdtemp()
output_path = os.path.join(temp_dir, "audio")
ydl_opts = {
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'outtmpl': output_path + '.%(ext)s',
'quiet': True,
'noplaylist': True,
'cookiefile': cookies_path if cookies_path else None,
'user_agent': 'Mozilla/5.0',
'referer': 'https://www.youtube.com/',
'force_ipv4': True,
}
with YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
for ext in [".m4a", ".webm", ".mp3"]:
final_path = output_path + ext
if os.path.exists(final_path):
return final_path, "βœ… Audio downloaded successfully"
return None, "❌ Audio file not found"
except Exception as e:
traceback.print_exc()
return None, f"❌ Download error: {str(e)}"
# Transcribe using Whisper
def transcribe_audio(path):
if not WHISPER_AVAILABLE:
return "❌ Whisper not available. Please install openai-whisper."
try:
model = whisper.load_model("tiny.en")
result = model.transcribe(path)
return result["text"]
except Exception as e:
traceback.print_exc()
return f"❌ Transcription failed: {str(e)}"
# Extract stock insights
def extract_stock_info(text):
try:
companies = re.findall(r'\b[A-Z][a-z]+(?: [A-Z][a-z]+)*\b', text)
symbols = re.findall(r'\b[A-Z]{2,5}\b', text)
prices = re.findall(r'\$\d+(?:\.\d{1,2})?', text)
actions = re.findall(r'\b(buy|sell|hold|target|bullish|bearish|stop loss|accumulate|short|take profit|entry|exit)\b', text, re.IGNORECASE)
result = "=== STOCK RECOMMENDATION ANALYSIS ===\n\n"
if companies:
result += f"🏒 Companies Mentioned: {', '.join(set(companies[:10]))}\n"
if symbols:
result += f"πŸ”  Symbols: {', '.join(set(symbols[:10]))}\n"
if prices:
result += f"πŸ’² Prices: {', '.join(set(prices[:10]))}\n"
if actions:
result += f"πŸ“Š Actions: {', '.join(set(actions[:10]))}\n"
recommendations = []
for line in text.split("."):
if any(word in line.lower() for word in ['buy', 'sell', 'target', 'hold', 'accumulate', 'short', 'entry', 'exit']):
recommendations.append(line.strip())
if recommendations:
result += "\n🎯 Potential Recommendations:\n"
for r in recommendations[:5]:
result += f"β€’ {r}\n"
if not any([companies, symbols, prices, actions]):
result += "\n⚠️ No stock-related insights detected."
return result
except Exception as e:
return f"❌ Stock info extraction failed: {str(e)}"
# Save cookies
def save_cookies(file):
if file is None:
return None
temp_path = tempfile.mktemp(suffix=".txt")
try:
if hasattr(file, "read"):
with open(temp_path, "wb") as f:
f.write(file.read())
else:
shutil.copy(file, temp_path)
return temp_path
except Exception as e:
print(f"❌ Failed to handle cookies.txt: {e}")
return None
# βœ… Trim audio to shorter length (2 minutes) for CPU speed
import subprocess
def trim_audio(input_path, output_path, duration_sec=120):
try:
command = [
"ffmpeg", "-y", "-i", input_path,
"-t", str(duration_sec), # duration in seconds
"-c", "copy", output_path
]
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return output_path
except Exception as e:
print("❌ Error trimming audio:", e)
return input_path
# YouTube flow
def run_pipeline(url, cookies_file, show_transcript):
try:
if not WHISPER_AVAILABLE:
return "❌ Whisper not installed", ""
if not url:
return "❌ YouTube URL required", ""
cookie_path = save_cookies(cookies_file)
audio_path, status = download_audio(url, cookie_path)
if not audio_path:
return status, ""
# ⏱ Trim audio to 2 minutes before transcription
trimmed_path = tempfile.mktemp(suffix=".mp3")
trim_audio(audio_path, trimmed_path)
transcript = transcribe_audio(trimmed_path)
if transcript.startswith("❌"):
return transcript, ""
stock_info = extract_stock_info(transcript)
if show_transcript:
return "βœ… Complete", f"πŸ“œ Transcript:\n\n{transcript}\n\n\n{stock_info}"
else:
return "βœ… Complete", stock_info
except Exception as e:
tb = traceback.format_exc()
print(tb)
return f"❌ Unhandled Error:\n{tb}", ""
# Audio upload flow
def run_pipeline_audio(audio_file, show_transcript):
try:
if not WHISPER_AVAILABLE:
return "❌ Whisper not installed", ""
if audio_file is None:
return "❌ No audio file uploaded", ""
# Save uploaded file
temp_audio_path = tempfile.mktemp(suffix=os.path.splitext(str(audio_file))[-1])
if hasattr(audio_file, "read"):
with open(temp_audio_path, "wb") as f:
f.write(audio_file.read())
else:
shutil.copy(str(audio_file), temp_audio_path)
# ⏱ Trim audio to 2 minutes
trimmed_path = tempfile.mktemp(suffix=".mp3")
trim_audio(temp_audio_path, trimmed_path)
transcript = transcribe_audio(trimmed_path)
if transcript.startswith("❌"):
return transcript, ""
stock_info = extract_stock_info(transcript)
if show_transcript:
return "βœ… Complete", f"πŸ“œ Transcript:\n\n{transcript}\n\n\n{stock_info}"
else:
return "βœ… Complete", stock_info
except Exception as e:
tb = traceback.format_exc()
print(tb)
return f"❌ Unhandled Error:\n{tb}", ""
# Gradio UI
with gr.Blocks(title="Stock Insights from YouTube or Audio") as demo:
gr.Markdown("""
# πŸ“ˆ Extract Stock Recommendations from YouTube or Uploaded Audio
Upload a YouTube video or audio file. We'll transcribe it using Whisper and extract stock insights.
""")
with gr.Tab("πŸ“Ί From YouTube Video"):
with gr.Row():
url_input = gr.Textbox(label="πŸŽ₯ YouTube URL")
cookie_input = gr.File(label="cookies.txt (optional)", file_types=[".txt"])
show_transcript_yt = gr.Checkbox(label="Show Transcript", value=False)
yt_run_btn = gr.Button("πŸš€ Extract from YouTube")
yt_status = gr.Textbox(label="Status")
yt_result = gr.Textbox(label="Transcript & Stock Info", lines=15)
yt_run_btn.click(fn=run_pipeline, inputs=[url_input, cookie_input, show_transcript_yt], outputs=[yt_status, yt_result])
with gr.Tab("🎡 From Uploaded Audio"):
audio_input = gr.File(label="Upload Audio File", file_types=[".mp3", ".wav", ".m4a", ".webm"])
show_transcript_audio = gr.Checkbox(label="Show Transcript", value=False)
audio_run_btn = gr.Button("πŸš€ Extract from Audio")
audio_status = gr.Textbox(label="Status")
audio_result = gr.Textbox(label="Transcript & Stock Info", lines=15)
audio_run_btn.click(fn=run_pipeline_audio, inputs=[audio_input, show_transcript_audio], outputs=[audio_status, audio_result])
if __name__ == "__main__":
demo.launch(debug=True)