File size: 13,448 Bytes
4fde749
a6e9713
0921abd
16e2e72
0921abd
16e2e72
0921abd
 
 
 
 
 
 
 
 
 
 
 
9518e76
 
0921abd
 
 
9518e76
d33e944
9518e76
 
 
 
 
 
 
 
 
 
 
0921abd
9518e76
 
0921abd
 
 
16e2e72
0921abd
 
 
 
9518e76
 
 
 
0921abd
9518e76
0921abd
9518e76
 
3a866c5
 
9518e76
 
 
 
 
 
 
 
0921abd
16e2e72
9518e76
 
 
 
a6273c7
9518e76
3a866c5
0921abd
9518e76
 
 
 
 
 
 
 
 
 
 
 
 
16e2e72
0921abd
9518e76
0921abd
 
9518e76
0921abd
16e2e72
9518e76
16e2e72
0921abd
9518e76
 
 
 
 
 
 
 
 
a6273c7
9518e76
16e2e72
0921abd
 
 
9518e76
0921abd
 
9518e76
 
 
 
0921abd
 
16e2e72
9518e76
 
0921abd
 
 
9518e76
 
 
 
 
 
 
0921abd
9518e76
 
 
 
 
0921abd
9518e76
 
3a866c5
 
9518e76
0921abd
9518e76
 
3a866c5
9518e76
 
0921abd
 
9518e76
3a866c5
9518e76
0921abd
9518e76
0921abd
9518e76
 
 
0921abd
 
9518e76
 
 
3a866c5
9518e76
 
 
 
 
 
16e2e72
0921abd
16e2e72
0921abd
9518e76
16e2e72
0921abd
 
 
 
 
 
 
 
 
 
 
 
 
9518e76
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16e2e72
9518e76
0921abd
9518e76
 
 
 
 
0921abd
9518e76
3a866c5
9518e76
 
 
 
 
 
3a866c5
9518e76
3a866c5
9518e76
 
16e2e72
0921abd
 
9518e76
16e2e72
0921abd
9518e76
16e2e72
9518e76
 
16e2e72
0921abd
16e2e72
9518e76
 
 
3644986
0921abd
9518e76
 
16e2e72
0921abd
9518e76
0921abd
16e2e72
0921abd
9518e76
16e2e72
0921abd
9518e76
 
3644986
9518e76
 
16e2e72
 
9518e76
 
0921abd
 
 
9518e76
0921abd
9518e76
0921abd
9518e76
0921abd
 
 
3a866c5
0921abd
 
 
 
 
 
9518e76
0921abd
9518e76
0921abd
9518e76
 
 
 
0921abd
9518e76
0921abd
 
9518e76
 
 
 
 
 
 
 
 
 
 
0921abd
 
9518e76
 
 
 
 
16e2e72
9518e76
 
 
 
 
 
 
 
 
 
 
 
 
16e2e72
0921abd
 
 
 
9518e76
 
 
0921abd
 
 
 
 
9518e76
 
 
3a866c5
 
0921abd
 
 
9518e76
 
0921abd
 
 
9518e76
 
 
 
 
 
 
 
 
0921abd
16e2e72
9518e76
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
import os
import tempfile
import gradio as gr
import re
import sys

# Try to import required packages with error handling
try:
    from yt_dlp import YoutubeDL
    YT_DLP_AVAILABLE = True
except ImportError as e:
    YT_DLP_AVAILABLE = False
    print(f"yt-dlp import error: {e}")

try:
    import whisper
    WHISPER_AVAILABLE = True
except ImportError as e:
    WHISPER_AVAILABLE = False
    print(f"whisper import error: {e}")

print(f"Python version: {sys.version}")
print(f"yt-dlp available: {YT_DLP_AVAILABLE}")
print(f"whisper available: {WHISPER_AVAILABLE}")

def get_cookies_path():
    """Get the path to cookies.txt file"""
    # Check if cookies.txt exists in the current directory
    if os.path.exists('cookies.txt'):
        return 'cookies.txt'
    # Check in the same directory as the script
    script_dir = os.path.dirname(os.path.abspath(__file__))
    cookies_path = os.path.join(script_dir, 'cookies.txt')
    if os.path.exists(cookies_path):
        return cookies_path
    return None

def download_audio(url):
    """Download audio from YouTube URL and return the file path"""
    if not YT_DLP_AVAILABLE:
        raise Exception("yt-dlp is not available. Please check the installation.")
    
    try:
        # Create a temporary directory for downloads
        temp_dir = tempfile.mkdtemp()
        output_path = os.path.join(temp_dir, "audio")
        
        # Get cookies path
        cookies_path = get_cookies_path()
        
        # Base yt-dlp options
        ydl_opts = {
            'format': 'bestaudio[ext=m4a]/bestaudio/best',
            'outtmpl': output_path + '.%(ext)s',
            'quiet': True,
            'no_warnings': True,
            'extract_flat': False,
            'ignoreerrors': False,
            # Add user agent to avoid bot detection
            'http_headers': {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            },
            # Add additional options to avoid bot detection
            'extractor_retries': 3,
            'fragment_retries': 3,
            'retry_sleep_functions': {'http': lambda n: 2 ** n},
        }
        
        # Add cookies if available
        if cookies_path:
            ydl_opts['cookiefile'] = cookies_path
            print(f"Using cookies from: {cookies_path}")
        else:
            print("No cookies.txt found - proceeding without cookies")
        
        with YoutubeDL(ydl_opts) as ydl:
            # Extract info first to check if video is available
            info_dict = ydl.extract_info(url, download=False)
            
            # Check if video is available
            if info_dict.get('availability') == 'private':
                raise Exception("Video is private")
            elif info_dict.get('availability') == 'premium_only':
                raise Exception("Video requires premium subscription")
            elif info_dict.get('live_status') == 'is_live':
                raise Exception("Cannot download live streams")
            
            # Download the audio
            ydl.download([url])
            
            # Find the downloaded file
            for ext in ['.m4a', '.webm', '.mp4', '.mp3']:
                potential_file = output_path + ext
                if os.path.exists(potential_file):
                    print(f"Successfully downloaded: {potential_file}")
                    return potential_file
            
            raise FileNotFoundError(f"Downloaded audio file not found")
                
    except Exception as e:
        error_msg = str(e)
        if "Sign in to confirm your age" in error_msg:
            raise Exception("Video is age-restricted. Please use a different video or update your cookies.")
        elif "Private video" in error_msg:
            raise Exception("Video is private and cannot be accessed.")
        elif "This video is unavailable" in error_msg:
            raise Exception("Video is unavailable or has been removed.")
        elif "blocked" in error_msg.lower():
            raise Exception("Access to this video is blocked. Try using updated cookies or a different video.")
        else:
            raise Exception(f"Failed to download audio: {error_msg}")

def transcribe_audio(file_path):
    """Transcribe audio file using Whisper"""
    if not WHISPER_AVAILABLE:
        raise Exception("OpenAI Whisper is not available. Please check the installation.")
    
    try:
        # Use the smallest model to reduce memory usage
        model = whisper.load_model("tiny")
        result = model.transcribe(file_path)
        return result["text"]
    except Exception as e:
        raise Exception(f"Failed to transcribe audio: {str(e)}")

def extract_stock_info_simple(text):
    """Extract stock information using simple pattern matching"""
    try:
        stock_info = []
        
        # Simple patterns to look for stock-related information
        stock_patterns = [
            r'\b[A-Z]{1,5}\b(?:\s+stock|\s+shares|\s+symbol)',  # Stock symbols
            r'(?:buy|sell|target|price)\s+[A-Z]{1,5}',
            r'\$\d+(?:\.\d{2})?',  # Dollar amounts
            r'\b(?:bullish|bearish|buy|sell|hold)\b',
        ]
        
        # Look for company names and stock mentions
        companies = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:Inc|Corp|Company|Ltd)\.?)?', text)
        symbols = re.findall(r'\b[A-Z]{2,5}\b', text)
        prices = re.findall(r'\$\d+(?:\.\d{2})?', text)
        actions = re.findall(r'\b(?:buy|sell|hold|bullish|bearish|target|stop\s+loss)\b', text, re.IGNORECASE)
        
        # Format the extracted information
        result = "=== EXTRACTED STOCK INFORMATION ===\n\n"
        
        if companies:
            result += f"πŸ“Š Mentioned Companies: {', '.join(set(companies[:10]))}\n\n"
        
        if symbols:
            result += f"πŸ”€ Potential Stock Symbols: {', '.join(set(symbols[:10]))}\n\n"
        
        if prices:
            result += f"πŸ’° Price Mentions: {', '.join(set(prices[:10]))}\n\n"
        
        if actions:
            result += f"πŸ“ˆ Trading Actions: {', '.join(set(actions[:10]))}\n\n"
        
        # Look for specific recommendation patterns
        recommendations = []
        sentences = text.split('.')
        for sentence in sentences:
            if any(word in sentence.lower() for word in ['buy', 'sell', 'target', 'recommendation']):
                if any(symbol in sentence for symbol in symbols[:5]):
                    recommendations.append(sentence.strip())
        
        if recommendations:
            result += "🎯 Potential Recommendations:\n"
            for rec in recommendations[:5]:
                result += f"β€’ {rec}\n"
        
        if not any([companies, symbols, prices, actions]):
            result += "⚠️ No clear stock recommendations found in the transcript.\n"
            result += "This might be because:\n"
            result += "β€’ The video doesn't contain stock recommendations\n"
            result += "β€’ The audio quality was poor\n"
            result += "β€’ The content is not in English\n"
        
        return result
        
    except Exception as e:
        return f"Error extracting stock info: {str(e)}"

def cleanup_file(file_path):
    """Clean up temporary files"""
    try:
        if file_path and os.path.exists(file_path):
            os.remove(file_path)
            # Also try to remove the directory if it's empty
            try:
                os.rmdir(os.path.dirname(file_path))
            except:
                pass
    except:
        pass

def system_test():
    """Test system components"""
    results = []
    
    # Test yt-dlp
    if YT_DLP_AVAILABLE:
        results.append("βœ… yt-dlp: Available")
        try:
            ydl = YoutubeDL({'quiet': True})
            results.append("βœ… yt-dlp: Can create YoutubeDL instance")
        except Exception as e:
            results.append(f"❌ yt-dlp: Cannot create instance - {e}")
    else:
        results.append("❌ yt-dlp: Not available")
    
    # Test Whisper
    if WHISPER_AVAILABLE:
        results.append("βœ… Whisper: Available (Type: openai-whisper)")
        try:
            import whisper
            results.append("βœ… Whisper: OpenAI Whisper can be imported")
        except Exception as e:
            results.append(f"❌ Whisper: Cannot import - {e}")
    else:
        results.append("❌ Whisper: Not available")
    
    # Test file operations
    try:
        temp_file = tempfile.NamedTemporaryFile(delete=False)
        temp_file.write(b"test")
        temp_file.close()
        os.remove(temp_file.name)
        results.append("βœ… File operations: Working")
    except Exception as e:
        results.append(f"❌ File operations: Failed - {e}")
    
    # Test cookies
    cookies_path = get_cookies_path()
    if cookies_path:
        results.append(f"βœ… Cookies: Found at {cookies_path}")
    else:
        results.append("⚠️ Cookies: Not found (may cause bot detection issues)")
    
    return "\n".join(results)

def process_video(url, progress=gr.Progress()):
    """Main function to process YouTube video"""
    
    # Check if required packages are available
    if not YT_DLP_AVAILABLE:
        return "Error: yt-dlp is not installed properly. Please check the requirements.", ""
    
    if not WHISPER_AVAILABLE:
        return "Error: OpenAI Whisper is not installed properly. Please check the requirements.", ""
    
    if not url or not url.strip():
        return "Please provide a valid YouTube URL", ""
    
    audio_path = None
    try:
        # Validate URL
        if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']):
            return "Please provide a valid YouTube URL", ""
        
        # Download audio
        progress(0.1, desc="Downloading audio...")
        audio_path = download_audio(url)
        
        # Transcribe audio
        progress(0.5, desc="Transcribing audio...")
        transcript = transcribe_audio(audio_path)
        
        if not transcript.strip():
            return "No speech detected in the video", ""
        
        # Extract stock information
        progress(0.8, desc="Extracting stock information...")
        stock_details = extract_stock_info_simple(transcript)
        
        progress(1.0, desc="Complete!")
        return transcript, stock_details
        
    except Exception as e:
        error_msg = f"Error processing video: {str(e)}"
        return error_msg, ""
    
    finally:
        # Clean up temporary files
        cleanup_file(audio_path)

# Create Gradio interface
with gr.Blocks(
    title="Stock Recommendation Extractor",
    theme=gr.themes.Soft(),
    css="""
    .gradio-container {
        max-width: 1200px;
        margin: auto;
    }
    """
) as demo:
    
    gr.Markdown("""
    # πŸ“ˆ Stock Recommendation Extractor from YouTube
    
    Extract stock recommendations and trading information from YouTube videos using AI transcription.
    
    **How it works:**
    1. Downloads audio from YouTube video
    2. Transcribes using OpenAI Whisper
    3. Extracts stock-related information
    
    **⚠️ Disclaimer:** This is for educational purposes only. Always do your own research!
    """)
    
    # Add system test section
    with gr.Accordion("πŸ§ͺ System Status", open=False):
        system_status = gr.Textbox(
            value=system_test(),
            label="System Test Results",
            lines=10,
            interactive=False
        )
        test_btn = gr.Button("πŸ”„ Re-run System Test")
        test_btn.click(fn=system_test, outputs=system_status)
    
    with gr.Row():
        with gr.Column(scale=1):
            url_input = gr.Textbox(
                label="πŸ“Ί YouTube URL",
                placeholder="https://www.youtube.com/watch?v=...",
                lines=2
            )
            
            process_btn = gr.Button(
                "πŸš€ Extract Stock Information", 
                variant="primary",
                size="lg"
            )
            
            gr.Markdown("""
            ### πŸ’‘ Tips:
            - Works best with financial YouTube channels
            - Ensure video has clear audio
            - English content works best
            - If you get bot detection errors, try updating cookies.txt
            """)
    
    with gr.Row():
        with gr.Column():
            transcript_output = gr.Textbox(
                label="πŸ“ Full Transcript",
                lines=15,
                max_lines=20,
                show_copy_button=True
            )
        
        with gr.Column():
            stock_info_output = gr.Textbox(
                label="πŸ“Š Extracted Stock Information",
                lines=15,
                max_lines=20,
                show_copy_button=True
            )
    
    # Event handlers
    process_btn.click(
        fn=process_video,
        inputs=[url_input],
        outputs=[transcript_output, stock_info_output],
        show_progress=True
    )
    
    # Example section
    gr.Markdown("### πŸ“‹ Example URLs (Replace with actual financial videos)")
    gr.Examples(
        examples=[
            ["https://www.youtube.com/watch?v=dQw4w9WgXcQ"],
        ],
        inputs=[url_input],
        label="Click to try example"
    )

if __name__ == "__main__":
    demo.launch()