Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import tempfile | |
import shutil | |
import re | |
import traceback | |
from yt_dlp import YoutubeDL | |
# Optional: use OpenAI Whisper if available | |
try: | |
import whisper | |
WHISPER_AVAILABLE = True | |
except: | |
WHISPER_AVAILABLE = False | |
# Download audio from YouTube | |
def download_audio(url, cookies_path=None): | |
try: | |
temp_dir = tempfile.mkdtemp() | |
output_path = os.path.join(temp_dir, "audio") | |
ydl_opts = { | |
'format': 'bestaudio[ext=m4a]/bestaudio/best', | |
'outtmpl': output_path + '.%(ext)s', | |
'quiet': True, | |
'noplaylist': True, | |
'cookiefile': cookies_path if cookies_path else None, | |
'user_agent': 'Mozilla/5.0', | |
'referer': 'https://www.youtube.com/', | |
'force_ipv4': True, | |
} | |
with YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
for ext in [".m4a", ".webm", ".mp3"]: | |
final_path = output_path + ext | |
if os.path.exists(final_path): | |
return final_path, "β Audio downloaded successfully" | |
return None, "β Audio file not found" | |
except Exception as e: | |
traceback.print_exc() | |
return None, f"β Download error: {str(e)}" | |
# Transcribe using Whisper | |
def transcribe_audio(path): | |
if not WHISPER_AVAILABLE: | |
return "β Whisper not available. Please install openai-whisper." | |
try: | |
model = whisper.load_model("tiny.en") | |
result = model.transcribe(path) | |
return result["text"] | |
except Exception as e: | |
traceback.print_exc() | |
return f"β Transcription failed: {str(e)}" | |
# Extract stock insights | |
def extract_stock_info(text): | |
try: | |
companies = re.findall(r'\b[A-Z][a-z]+(?: [A-Z][a-z]+)*\b', text) | |
symbols = re.findall(r'\b[A-Z]{2,5}\b', text) | |
prices = re.findall(r'\$\d+(?:\.\d{1,2})?', text) | |
actions = re.findall(r'\b(buy|sell|hold|target|bullish|bearish|stop loss|accumulate|short|take profit|entry|exit)\b', text, re.IGNORECASE) | |
result = "=== STOCK RECOMMENDATION ANALYSIS ===\n\n" | |
if companies: | |
result += f"π’ Companies Mentioned: {', '.join(set(companies[:10]))}\n" | |
if symbols: | |
result += f"π Symbols: {', '.join(set(symbols[:10]))}\n" | |
if prices: | |
result += f"π² Prices: {', '.join(set(prices[:10]))}\n" | |
if actions: | |
result += f"π Actions: {', '.join(set(actions[:10]))}\n" | |
recommendations = [] | |
for line in text.split("."): | |
if any(word in line.lower() for word in ['buy', 'sell', 'target', 'hold', 'accumulate', 'short', 'entry', 'exit']): | |
recommendations.append(line.strip()) | |
if recommendations: | |
result += "\nπ― Potential Recommendations:\n" | |
for r in recommendations[:5]: | |
result += f"β’ {r}\n" | |
if not any([companies, symbols, prices, actions]): | |
result += "\nβ οΈ No stock-related insights detected." | |
return result | |
except Exception as e: | |
return f"β Stock info extraction failed: {str(e)}" | |
# Save cookies | |
def save_cookies(file): | |
if file is None: | |
return None | |
temp_path = tempfile.mktemp(suffix=".txt") | |
try: | |
if hasattr(file, "read"): | |
with open(temp_path, "wb") as f: | |
f.write(file.read()) | |
else: | |
shutil.copy(file, temp_path) | |
return temp_path | |
except Exception as e: | |
print(f"β Failed to handle cookies.txt: {e}") | |
return None | |
# β Trim audio to shorter length (2 minutes) for CPU speed | |
import subprocess | |
def trim_audio(input_path, output_path, duration_sec=120): | |
try: | |
command = [ | |
"ffmpeg", "-y", "-i", input_path, | |
"-t", str(duration_sec), # duration in seconds | |
"-c", "copy", output_path | |
] | |
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
return output_path | |
except Exception as e: | |
print("β Error trimming audio:", e) | |
return input_path | |
# YouTube flow | |
def run_pipeline(url, cookies_file, show_transcript): | |
try: | |
if not WHISPER_AVAILABLE: | |
return "β Whisper not installed", "" | |
if not url: | |
return "β YouTube URL required", "" | |
cookie_path = save_cookies(cookies_file) | |
audio_path, status = download_audio(url, cookie_path) | |
if not audio_path: | |
return status, "" | |
# β± Trim audio to 2 minutes before transcription | |
trimmed_path = tempfile.mktemp(suffix=".mp3") | |
trim_audio(audio_path, trimmed_path) | |
transcript = transcribe_audio(trimmed_path) | |
if transcript.startswith("β"): | |
return transcript, "" | |
stock_info = extract_stock_info(transcript) | |
if show_transcript: | |
return "β Complete", f"π Transcript:\n\n{transcript}\n\n\n{stock_info}" | |
else: | |
return "β Complete", stock_info | |
except Exception as e: | |
tb = traceback.format_exc() | |
print(tb) | |
return f"β Unhandled Error:\n{tb}", "" | |
# Audio upload flow | |
def run_pipeline_audio(audio_file, show_transcript): | |
try: | |
if not WHISPER_AVAILABLE: | |
return "β Whisper not installed", "" | |
if audio_file is None: | |
return "β No audio file uploaded", "" | |
# Save uploaded file | |
temp_audio_path = tempfile.mktemp(suffix=os.path.splitext(str(audio_file))[-1]) | |
if hasattr(audio_file, "read"): | |
with open(temp_audio_path, "wb") as f: | |
f.write(audio_file.read()) | |
else: | |
shutil.copy(str(audio_file), temp_audio_path) | |
# β± Trim audio to 2 minutes | |
trimmed_path = tempfile.mktemp(suffix=".mp3") | |
trim_audio(temp_audio_path, trimmed_path) | |
transcript = transcribe_audio(trimmed_path) | |
if transcript.startswith("β"): | |
return transcript, "" | |
stock_info = extract_stock_info(transcript) | |
if show_transcript: | |
return "β Complete", f"π Transcript:\n\n{transcript}\n\n\n{stock_info}" | |
else: | |
return "β Complete", stock_info | |
except Exception as e: | |
tb = traceback.format_exc() | |
print(tb) | |
return f"β Unhandled Error:\n{tb}", "" | |
# Gradio UI | |
with gr.Blocks(title="Stock Insights from YouTube or Audio") as demo: | |
gr.Markdown(""" | |
# π Extract Stock Recommendations from YouTube or Uploaded Audio | |
Upload a YouTube video or audio file. We'll transcribe it using Whisper and extract stock insights. | |
""") | |
with gr.Tab("πΊ From YouTube Video"): | |
with gr.Row(): | |
url_input = gr.Textbox(label="π₯ YouTube URL") | |
cookie_input = gr.File(label="cookies.txt (optional)", file_types=[".txt"]) | |
show_transcript_yt = gr.Checkbox(label="Show Transcript", value=False) | |
yt_run_btn = gr.Button("π Extract from YouTube") | |
yt_status = gr.Textbox(label="Status") | |
yt_result = gr.Textbox(label="Transcript & Stock Info", lines=15) | |
yt_run_btn.click(fn=run_pipeline, inputs=[url_input, cookie_input, show_transcript_yt], outputs=[yt_status, yt_result]) | |
with gr.Tab("π΅ From Uploaded Audio"): | |
audio_input = gr.File(label="Upload Audio File", file_types=[".mp3", ".wav", ".m4a", ".webm"]) | |
show_transcript_audio = gr.Checkbox(label="Show Transcript", value=False) | |
audio_run_btn = gr.Button("π Extract from Audio") | |
audio_status = gr.Textbox(label="Status") | |
audio_result = gr.Textbox(label="Transcript & Stock Info", lines=15) | |
audio_run_btn.click(fn=run_pipeline_audio, inputs=[audio_input, show_transcript_audio], outputs=[audio_status, audio_result]) | |
if __name__ == "__main__": | |
demo.launch(debug=True) | |