Samuel Thomas commited on
Commit
0264a40
·
1 Parent(s): 7ab4cd0

youtube transcript correction

Browse files
Files changed (1) hide show
  1. tools.py +215 -488
tools.py CHANGED
@@ -2124,42 +2124,58 @@ def create_enhanced_youtube_qa_tool(**kwargs):
2124
  """Factory function to create the enhanced tool with custom parameters"""
2125
  return EnhancedYoutubeScreenshotQA(**kwargs)
2126
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2127
 
2128
  class YouTubeTranscriptExtractor(BaseTool):
2129
  name: str = "youtube_transcript_extractor"
2130
  description: str = (
2131
- "Downloads a YouTube video and extracts the complete audio transcript using speech recognition with speaker identification. "
2132
- #"Use this tool for AUDIO questions, when the youtube question involves what a person says,"
2133
- "Use this tool for questions like 'what does jim say in response to a question in this video',"
2134
  "Input should be a dict with keys: 'youtube_url' and optional parameters. "
2135
- #"Optional parameters: 'language' (default: 'en-US'), 'chunk_length_ms' (default: 30000), "
2136
- #"'silence_thresh' (default: -40), 'use_enhanced_model' (default: True), 'audio_quality' (default: 'best'), "
2137
- #"'enable_speaker_id' (default: True), 'max_speakers' (default: 5), 'speaker_min_duration' (default: 2.0). "
2138
- "Example: {'youtube_url': 'https://youtube.com/watch?v=xyz', 'language': 'en-US', 'enable_speaker_id': True}"
2139
  )
2140
 
2141
  # Define Pydantic fields for the attributes we need to set
2142
  recognizer: Any = Field(default=None, exclude=True)
2143
 
2144
  class Config:
2145
- # Allow arbitrary types
2146
  arbitrary_types_allowed = True
2147
- # Allow extra fields to be set
2148
  extra = "allow"
2149
 
2150
  def __init__(self, **kwargs):
2151
  super().__init__(**kwargs)
2152
 
2153
  # Initialize directories
2154
- cache_dir = '/tmp/youtube_transcript_cache/'
2155
- audio_dir = '/tmp/audio/'
2156
- chunks_dir = '/tmp/audio_chunks/'
2157
 
2158
  # Initialize speech recognizer
2159
  self.recognizer = sr.Recognizer()
 
 
2160
 
2161
  # Create directories
2162
- for dir_path in [cache_dir, audio_dir, chunks_dir]:
2163
  os.makedirs(dir_path, exist_ok=True)
2164
 
2165
  def _get_config(self, key: str, default_value=None, input_data: Dict[str, Any] = None):
@@ -2168,19 +2184,10 @@ class YouTubeTranscriptExtractor(BaseTool):
2168
  'language': 'en-US',
2169
  'chunk_length_ms': 30000, # 30 seconds
2170
  'silence_thresh': -40, # dB
2171
- 'use_enhanced_model': True,
2172
  'audio_quality': 'best',
2173
  'cache_enabled': True,
2174
- 'parallel_processing': True,
2175
- 'overlap_ms': 1000, # 1 second overlap between chunks
2176
  'min_silence_len': 500, # minimum silence length to split on
2177
- 'energy_threshold': 4000, # recognizer energy threshold
2178
- 'pause_threshold': 0.8, # recognizer pause threshold
2179
- 'enable_speaker_id': True, # enable speaker identification
2180
- 'max_speakers': 5, # maximum number of speakers to identify
2181
- 'speaker_min_duration': 2.0, # minimum duration (seconds) for speaker segment
2182
- 'speaker_confidence_threshold': 0.6, # confidence threshold for speaker assignment
2183
- 'voice_activity_threshold': 0.01 # threshold for voice activity detection
2184
  }
2185
 
2186
  if input_data and key in input_data:
@@ -2193,8 +2200,7 @@ class YouTubeTranscriptExtractor(BaseTool):
2193
 
2194
  def _get_cache_path(self, video_hash: str, cache_type: str) -> str:
2195
  """Get cache file path"""
2196
- cache_dir = '/tmp/youtube_transcript_cache/'
2197
- return os.path.join(cache_dir, f"{video_hash}_{cache_type}")
2198
 
2199
  def _load_from_cache(self, cache_path: str, cache_enabled: bool = True) -> Optional[Any]:
2200
  """Load data from cache"""
@@ -2217,12 +2223,24 @@ class YouTubeTranscriptExtractor(BaseTool):
2217
  except Exception as e:
2218
  print(f"Error saving cache: {str(e)}")
2219
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2220
  def download_youtube_audio(self, url: str, video_hash: str, input_data: Dict[str, Any] = None) -> Optional[str]:
2221
  """Download YouTube video as audio file"""
2222
- audio_dir = '/tmp/audio/'
2223
  audio_quality = self._get_config('audio_quality', 'best', input_data)
2224
  output_filename = f'{video_hash}.wav'
2225
- output_path = os.path.join(audio_dir, output_filename)
2226
 
2227
  # Check cache
2228
  cache_enabled = self._get_config('cache_enabled', True, input_data)
@@ -2231,147 +2249,97 @@ class YouTubeTranscriptExtractor(BaseTool):
2231
  return output_path
2232
 
2233
  # Clean directory
2234
- self._clean_directory(audio_dir)
2235
 
2236
  try:
2237
- # First download as mp4/webm
2238
- temp_video_path = os.path.join(audio_dir, f'{video_hash}_temp.%(ext)s')
2239
-
2240
  ydl_opts = {
2241
- 'format': 'bestaudio/best' if audio_quality == 'best' else 'worstaudio/worst',
2242
- 'outtmpl': temp_video_path,
2243
- 'quiet': True,
2244
- 'extractaudio': True,
2245
- 'audioformat': 'wav',
2246
- 'audioquality': '192K' if audio_quality == 'best' else '64K',
 
 
 
 
 
 
 
 
 
 
 
 
 
2247
  }
2248
 
2249
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
 
2250
  ydl.download([url])
2251
 
2252
- # Find the downloaded file
2253
- temp_files = glob.glob(os.path.join(audio_dir, f'{video_hash}_temp.*'))
2254
- if not temp_files:
2255
- print("No temporary audio file found")
2256
- return None
2257
-
2258
- temp_file = temp_files[0]
2259
-
2260
- # Convert to WAV if not already
2261
- if not temp_file.endswith('.wav'):
2262
- try:
2263
- audio = AudioSegment.from_file(temp_file)
2264
- audio.export(output_path, format="wav")
2265
- os.remove(temp_file) # Clean up temp file
2266
- except Exception as e:
2267
- print(f"Error converting audio: {str(e)}")
2268
- # Try to rename if it's already the right format
2269
- if os.path.exists(temp_file):
2270
- os.rename(temp_file, output_path)
2271
- else:
2272
- os.rename(temp_file, output_path)
2273
-
2274
  if os.path.exists(output_path):
2275
- print(f"Audio extracted successfully: {output_path}")
2276
  return output_path
2277
  else:
2278
- print("Audio extraction completed but file not found")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2279
  return None
2280
 
2281
  except Exception as e:
2282
  print(f"Error downloading YouTube audio: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2283
  return None
2284
 
2285
- def _clean_directory(self, directory: str):
2286
- """Clean directory contents"""
2287
- if os.path.exists(directory):
2288
- for filename in os.listdir(directory):
2289
- file_path = os.path.join(directory, filename)
2290
- try:
2291
- if os.path.isfile(file_path) or os.path.islink(file_path):
2292
- os.unlink(file_path)
2293
- elif os.path.isdir(file_path):
2294
- shutil.rmtree(file_path)
2295
- except Exception as e:
2296
- print(f'Failed to delete {file_path}. Reason: {e}')
2297
-
2298
- def _extract_voice_features(self, audio_path: str) -> Optional[np.ndarray]:
2299
- """Extract voice features for speaker identification using librosa"""
2300
- try:
2301
- # Load audio with librosa
2302
- y, sr = librosa.load(audio_path, sr=None)
2303
-
2304
- # Extract MFCC features (commonly used for speaker identification)
2305
- mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
2306
-
2307
- # Extract additional features
2308
- spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
2309
- spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
2310
- zero_crossing_rate = librosa.feature.zero_crossing_rate(y)
2311
-
2312
- # Combine features and take mean across time
2313
- features = np.concatenate([
2314
- np.mean(mfccs, axis=1),
2315
- np.mean(spectral_centroids),
2316
- np.mean(spectral_rolloff),
2317
- np.mean(zero_crossing_rate)
2318
- ])
2319
-
2320
- return features
2321
-
2322
- except Exception as e:
2323
- print(f"Error extracting voice features from {audio_path}: {str(e)}")
2324
- return None
2325
-
2326
- def _detect_voice_activity(self, audio_path: str, input_data: Dict[str, Any] = None) -> List[Tuple[float, float]]:
2327
- """Detect voice activity in audio chunk"""
2328
- try:
2329
- y, sr = librosa.load(audio_path, sr=None)
2330
-
2331
- # Simple voice activity detection based on energy
2332
- frame_length = int(0.025 * sr) # 25ms frames
2333
- hop_length = int(0.010 * sr) # 10ms hop
2334
-
2335
- # Calculate short-time energy
2336
- energy = []
2337
- for i in range(0, len(y) - frame_length, hop_length):
2338
- frame = y[i:i + frame_length]
2339
- energy.append(np.sum(frame ** 2))
2340
-
2341
- energy = np.array(energy)
2342
- threshold = self._get_config('voice_activity_threshold', 0.01, input_data)
2343
-
2344
- # Find voice segments
2345
- voice_frames = energy > (np.max(energy) * threshold)
2346
-
2347
- # Convert frame indices to time segments
2348
- voice_segments = []
2349
- in_voice = False
2350
- start_time = 0
2351
-
2352
- for i, is_voice in enumerate(voice_frames):
2353
- time_sec = i * hop_length / sr
2354
- if is_voice and not in_voice:
2355
- start_time = time_sec
2356
- in_voice = True
2357
- elif not is_voice and in_voice:
2358
- voice_segments.append((start_time, time_sec))
2359
- in_voice = False
2360
-
2361
- # Close last segment if needed
2362
- if in_voice:
2363
- voice_segments.append((start_time, len(y) / sr))
2364
-
2365
- return voice_segments
2366
-
2367
- except Exception as e:
2368
- print(f"Error in voice activity detection: {str(e)}")
2369
- return [(0, librosa.get_duration(filename=audio_path))]
2370
-
2371
  def _split_audio_intelligent(self, audio_path: str, input_data: Dict[str, Any] = None) -> List[Dict[str, Any]]:
2372
- """Split audio into chunks intelligently based on silence and voice activity"""
2373
- chunks_dir = '/tmp/audio_chunks/'
2374
- self._clean_directory(chunks_dir)
2375
 
2376
  try:
2377
  # Load audio
@@ -2402,40 +2370,46 @@ class YouTubeTranscriptExtractor(BaseTool):
2402
 
2403
  # Save chunks and create metadata
2404
  chunk_data = []
 
 
2405
  for i, chunk in enumerate(chunks):
2406
  if len(chunk) < 1000: # Skip very short chunks
2407
  continue
2408
 
2409
- chunk_filename = os.path.join(chunks_dir, f"chunk_{i:04d}.wav")
2410
  chunk.export(chunk_filename, format="wav")
2411
 
2412
- # Calculate timing information
2413
- start_time = sum(len(chunks[j]) for j in range(i)) / 1000.0 # in seconds
2414
  duration = len(chunk) / 1000.0 # in seconds
2415
 
2416
  chunk_info = {
2417
  'filename': chunk_filename,
2418
  'index': i,
2419
- 'start_time': start_time,
2420
  'duration': duration,
2421
- 'end_time': start_time + duration
2422
  }
2423
 
2424
  chunk_data.append(chunk_info)
 
2425
 
2426
  print(f"Split audio into {len(chunk_data)} chunks")
2427
  return chunk_data
2428
 
2429
  except Exception as e:
2430
  print(f"Error splitting audio: {str(e)}")
2431
- # Fallback: return original file
2432
- return [{
2433
- 'filename': audio_path,
2434
- 'index': 0,
2435
- 'start_time': 0,
2436
- 'duration': len(AudioSegment.from_wav(audio_path)) / 1000.0,
2437
- 'end_time': len(AudioSegment.from_wav(audio_path)) / 1000.0
2438
- }]
 
 
 
 
 
2439
 
2440
  def _transcribe_audio_chunk(self, chunk_info: Dict[str, Any], input_data: Dict[str, Any] = None) -> Dict[str, Any]:
2441
  """Transcribe a single audio chunk"""
@@ -2443,375 +2417,115 @@ class YouTubeTranscriptExtractor(BaseTool):
2443
  try:
2444
  language = self._get_config('language', 'en-US', input_data)
2445
 
2446
- # Configure recognizer
2447
- self.recognizer.energy_threshold = self._get_config('energy_threshold', 4000, input_data)
2448
- self.recognizer.pause_threshold = self._get_config('pause_threshold', 0.8, input_data)
2449
-
2450
  with sr.AudioFile(chunk_path) as source:
2451
  # Adjust for ambient noise
2452
  self.recognizer.adjust_for_ambient_noise(source, duration=0.5)
2453
  audio_data = self.recognizer.record(source)
2454
 
2455
- # Try Google Speech Recognition first (most accurate)
2456
  try:
2457
  text = self.recognizer.recognize_google(audio_data, language=language)
2458
- result = {
2459
  'text': text,
2460
- 'confidence': 1.0, # Google doesn't provide confidence
2461
- 'method': 'google',
2462
- 'chunk': os.path.basename(chunk_path),
2463
  'start_time': chunk_info['start_time'],
2464
  'end_time': chunk_info['end_time'],
2465
  'duration': chunk_info['duration'],
2466
- 'index': chunk_info['index']
 
2467
  }
2468
 
2469
- # Extract voice features if speaker ID is enabled
2470
- if self._get_config('enable_speaker_id', True, input_data):
2471
- features = self._extract_voice_features(chunk_path)
2472
- result['voice_features'] = features.tolist() if features is not None else None
2473
-
2474
- return result
2475
-
2476
  except sr.UnknownValueError:
2477
- # Try alternative recognition methods
2478
  try:
2479
- # Try with alternative language detection
2480
  text = self.recognizer.recognize_google(audio_data)
2481
- result = {
2482
  'text': text,
2483
- 'confidence': 0.8, # Lower confidence for language mismatch
2484
- 'method': 'google_auto',
2485
- 'chunk': os.path.basename(chunk_path),
2486
  'start_time': chunk_info['start_time'],
2487
  'end_time': chunk_info['end_time'],
2488
  'duration': chunk_info['duration'],
2489
- 'index': chunk_info['index']
 
2490
  }
2491
-
2492
- if self._get_config('enable_speaker_id', True, input_data):
2493
- features = self._extract_voice_features(chunk_path)
2494
- result['voice_features'] = features.tolist() if features is not None else None
2495
-
2496
- return result
2497
-
2498
  except sr.UnknownValueError:
2499
  return {
2500
  'text': '[INAUDIBLE]',
2501
  'confidence': 0.0,
2502
- 'method': 'failed',
2503
- 'chunk': os.path.basename(chunk_path),
2504
  'start_time': chunk_info['start_time'],
2505
  'end_time': chunk_info['end_time'],
2506
  'duration': chunk_info['duration'],
2507
  'index': chunk_info['index'],
2508
- 'voice_features': None
2509
  }
2510
  except sr.RequestError as e:
2511
- print(f"Google Speech Recognition error: {e}")
2512
  return {
2513
- 'text': '[RECOGNITION_ERROR]',
2514
  'confidence': 0.0,
2515
- 'method': 'error',
2516
- 'chunk': os.path.basename(chunk_path),
2517
  'start_time': chunk_info['start_time'],
2518
  'end_time': chunk_info['end_time'],
2519
  'duration': chunk_info['duration'],
2520
  'index': chunk_info['index'],
2521
- 'error': str(e),
2522
- 'voice_features': None
2523
  }
2524
 
2525
  except Exception as e:
2526
- print(f"Error transcribing chunk {chunk_path}: {str(e)}")
2527
  return {
2528
- 'text': '[ERROR]',
2529
  'confidence': 0.0,
2530
- 'method': 'error',
2531
- 'chunk': os.path.basename(chunk_path),
2532
  'start_time': chunk_info.get('start_time', 0),
2533
  'end_time': chunk_info.get('end_time', 0),
2534
  'duration': chunk_info.get('duration', 0),
2535
  'index': chunk_info.get('index', 0),
2536
- 'error': str(e),
2537
- 'voice_features': None
2538
  }
2539
 
2540
- def _identify_speakers(self, transcript_results: List[Dict[str, Any]], input_data: Dict[str, Any] = None) -> List[Dict[str, Any]]:
2541
- """Identify speakers using voice features clustering"""
2542
- enable_speaker_id = self._get_config('enable_speaker_id', True, input_data)
2543
- if not enable_speaker_id:
2544
- # Add default speaker tags
2545
- for result in transcript_results:
2546
- result['speaker_id'] = 'SPEAKER_1'
2547
- result['speaker_confidence'] = 1.0
2548
- return transcript_results
2549
-
2550
- try:
2551
- # Filter results with valid voice features and text
2552
- valid_results = []
2553
- features_list = []
2554
-
2555
- for result in transcript_results:
2556
- if (result.get('voice_features') is not None and
2557
- result['text'] not in ['[INAUDIBLE]', '[RECOGNITION_ERROR]', '[ERROR]', '[PROCESSING_ERROR]']):
2558
- valid_results.append(result)
2559
- features_list.append(result['voice_features'])
2560
-
2561
- if len(features_list) < 2:
2562
- # Not enough data for clustering
2563
- for result in transcript_results:
2564
- result['speaker_id'] = 'SPEAKER_1'
2565
- result['speaker_confidence'] = 1.0
2566
- return transcript_results
2567
-
2568
- # Normalize features
2569
- features_array = np.array(features_list)
2570
- scaler = StandardScaler()
2571
- normalized_features = scaler.fit_transform(features_array)
2572
-
2573
- # Determine optimal number of speakers
2574
- max_speakers = min(self._get_config('max_speakers', 5, input_data), len(features_list))
2575
-
2576
- # Use elbow method to find optimal clusters (simplified)
2577
- best_k = 1
2578
- if len(features_list) > 1:
2579
- best_score = float('inf')
2580
- for k in range(1, min(max_speakers + 1, len(features_list) + 1)):
2581
- try:
2582
- kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
2583
- labels = kmeans.fit_predict(normalized_features)
2584
- if k > 1:
2585
- score = kmeans.inertia_
2586
- if score < best_score:
2587
- best_score = score
2588
- best_k = k
2589
- except:
2590
- continue
2591
-
2592
- # Don't use too many clusters for short audio
2593
- if len(features_list) < 10:
2594
- best_k = min(best_k, 2)
2595
-
2596
- # Perform final clustering
2597
- kmeans = KMeans(n_clusters=best_k, random_state=42, n_init=10)
2598
- speaker_labels = kmeans.fit_predict(normalized_features)
2599
-
2600
- # Calculate speaker assignment confidence
2601
- distances = kmeans.transform(normalized_features)
2602
- confidences = []
2603
- for i, label in enumerate(speaker_labels):
2604
- # Confidence based on distance to assigned cluster vs. nearest other cluster
2605
- dist_to_assigned = distances[i][label]
2606
- other_distances = np.delete(distances[i], label)
2607
- if len(other_distances) > 0:
2608
- dist_to_nearest_other = np.min(other_distances)
2609
- confidence = max(0.1, min(1.0, dist_to_nearest_other / (dist_to_assigned + 1e-6)))
2610
- else:
2611
- confidence = 1.0
2612
- confidences.append(confidence)
2613
-
2614
- # Assign speaker IDs back to results
2615
- valid_idx = 0
2616
- speaker_duration = {} # Track duration per speaker
2617
-
2618
- for result in transcript_results:
2619
- if (result.get('voice_features') is not None and
2620
- result['text'] not in ['[INAUDIBLE]', '[RECOGNITION_ERROR]', '[ERROR]', '[PROCESSING_ERROR]']):
2621
-
2622
- speaker_label = speaker_labels[valid_idx]
2623
- confidence = confidences[valid_idx]
2624
-
2625
- # Filter by confidence threshold
2626
- conf_threshold = self._get_config('speaker_confidence_threshold', 0.6, input_data)
2627
- if confidence < conf_threshold:
2628
- speaker_id = 'SPEAKER_UNKNOWN'
2629
- else:
2630
- speaker_id = f'SPEAKER_{speaker_label + 1}'
2631
-
2632
- result['speaker_id'] = speaker_id
2633
- result['speaker_confidence'] = confidence
2634
-
2635
- # Track speaker duration
2636
- if speaker_id in speaker_duration:
2637
- speaker_duration[speaker_id] += result['duration']
2638
- else:
2639
- speaker_duration[speaker_id] = result['duration']
2640
-
2641
- valid_idx += 1
2642
- else:
2643
- # Handle invalid results
2644
- result['speaker_id'] = 'SPEAKER_UNKNOWN'
2645
- result['speaker_confidence'] = 0.0
2646
-
2647
- # Filter out speakers with insufficient duration
2648
- min_duration = self._get_config('speaker_min_duration', 2.0, input_data)
2649
- speakers_to_merge = [s for s, d in speaker_duration.items() if d < min_duration and s != 'SPEAKER_UNKNOWN']
2650
-
2651
- # Merge low-duration speakers into SPEAKER_UNKNOWN
2652
- for result in transcript_results:
2653
- if result['speaker_id'] in speakers_to_merge:
2654
- result['speaker_id'] = 'SPEAKER_UNKNOWN'
2655
- result['speaker_confidence'] = 0.3
2656
-
2657
- print(f"Identified {best_k} speakers based on voice characteristics")
2658
- return transcript_results
2659
-
2660
- except Exception as e:
2661
- print(f"Error in speaker identification: {str(e)}")
2662
- # Fallback: assign all to single speaker
2663
- for result in transcript_results:
2664
- result['speaker_id'] = 'SPEAKER_1'
2665
- result['speaker_confidence'] = 1.0
2666
- return transcript_results
2667
-
2668
  def _transcribe_chunks_parallel(self, chunk_data: List[Dict[str, Any]], input_data: Dict[str, Any] = None) -> List[Dict[str, Any]]:
2669
  """Transcribe audio chunks in parallel"""
2670
  results = []
2671
- parallel_processing = self._get_config('parallel_processing', True, input_data)
2672
-
2673
- if parallel_processing:
2674
- # Use fewer workers for speech recognition to avoid API limits
2675
- max_workers = min(3, len(chunk_data))
2676
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
2677
- future_to_chunk = {
2678
- executor.submit(self._transcribe_audio_chunk, chunk_info, input_data): chunk_info
2679
- for chunk_info in chunk_data
2680
- }
2681
 
2682
- for future in as_completed(future_to_chunk):
2683
- chunk_info = future_to_chunk[future]
2684
- try:
2685
- result = future.result()
2686
- results.append(result)
2687
- print(f"Transcribed {result['chunk']}: {result['text'][:50]}..." if len(result['text']) > 50 else f"Transcribed {result['chunk']}: {result['text']}")
2688
- except Exception as e:
2689
- print(f"Error processing {chunk_info['filename']}: {str(e)}")
2690
- results.append({
2691
- 'text': '[PROCESSING_ERROR]',
2692
- 'confidence': 0.0,
2693
- 'method': 'error',
2694
- 'chunk': os.path.basename(chunk_info['filename']),
2695
- 'start_time': chunk_info.get('start_time', 0),
2696
- 'end_time': chunk_info.get('end_time', 0),
2697
- 'duration': chunk_info.get('duration', 0),
2698
- 'index': chunk_info.get('index', 0),
2699
- 'error': str(e),
2700
- 'voice_features': None
2701
- })
2702
- else:
2703
- for chunk_info in chunk_data:
2704
- result = self._transcribe_audio_chunk(chunk_info, input_data)
2705
- results.append(result)
2706
- print(f"Transcribed {result['chunk']}: {result['text'][:50]}..." if len(result['text']) > 50 else f"Transcribed {result['chunk']}: {result['text']}")
2707
 
2708
  # Sort results by chunk index to maintain order
2709
  results.sort(key=lambda x: x['index'])
2710
  return results
2711
 
2712
- def _post_process_transcript(self, transcript_results: List[Dict[str, Any]], input_data: Dict[str, Any] = None) -> Dict[str, Any]:
2713
- """Post-process and analyze transcript results with speaker information"""
2714
- enable_speaker_id = self._get_config('enable_speaker_id', True, input_data)
2715
-
2716
- # Identify speakers if enabled
2717
- if enable_speaker_id:
2718
- transcript_results = self._identify_speakers(transcript_results, input_data)
2719
-
2720
- # Combine text with speaker tags
2721
- full_text_parts = []
2722
- speaker_tagged_text = []
2723
- successful_chunks = 0
2724
- total_confidence = 0.0
2725
- method_counts = {}
2726
- speaker_stats = {}
2727
-
2728
- current_speaker = None
2729
- current_speaker_text = []
2730
-
2731
- for result in transcript_results:
2732
- text = result['text']
2733
- speaker = result.get('speaker_id', 'SPEAKER_1')
2734
- start_time = result.get('start_time', 0)
2735
-
2736
- if text not in ['[INAUDIBLE]', '[RECOGNITION_ERROR]', '[ERROR]', '[PROCESSING_ERROR]']:
2737
- full_text_parts.append(text)
2738
- successful_chunks += 1
2739
- total_confidence += result['confidence']
2740
-
2741
- # Handle speaker transitions
2742
- if enable_speaker_id:
2743
- if current_speaker != speaker:
2744
- # Save previous speaker's text
2745
- if current_speaker and current_speaker_text:
2746
- combined_text = ' '.join(current_speaker_text)
2747
- speaker_tagged_text.append(f"[{current_speaker}]: {combined_text}")
2748
-
2749
- # Start new speaker
2750
- current_speaker = speaker
2751
- current_speaker_text = [text]
2752
- else:
2753
- # Continue with same speaker
2754
- current_speaker_text.append(text)
2755
- else:
2756
- speaker_tagged_text.append(text)
2757
-
2758
- # Update speaker statistics
2759
- if speaker in speaker_stats:
2760
- speaker_stats[speaker]['duration'] += result.get('duration', 0)
2761
- speaker_stats[speaker]['word_count'] += len(text.split())
2762
- speaker_stats[speaker]['segments'] += 1
2763
- else:
2764
- speaker_stats[speaker] = {
2765
- 'duration': result.get('duration', 0),
2766
- 'word_count': len(text.split()),
2767
- 'segments': 1,
2768
- 'confidence': result.get('speaker_confidence', 1.0)
2769
- }
2770
-
2771
- method = result['method']
2772
- method_counts[method] = method_counts.get(method, 0) + 1
2773
-
2774
- # Add final speaker text
2775
- if enable_speaker_id and current_speaker and current_speaker_text:
2776
- combined_text = ' '.join(current_speaker_text)
2777
- speaker_tagged_text.append(f"[{current_speaker}]: {combined_text}")
2778
-
2779
- # Combine texts
2780
- combined_text = ' '.join(full_text_parts)
2781
- speaker_formatted_text = combined_text
2782
-
2783
- # Calculate statistics
2784
- word_count = len(combined_text.split()) if combined_text else 0
2785
- char_count = len(combined_text)
2786
- avg_confidence = total_confidence / max(1, successful_chunks)
2787
- success_rate = successful_chunks / len(transcript_results) if transcript_results else 0
2788
-
2789
- # Estimate speaking duration (rough approximation: 150 words per minute)
2790
- estimated_duration_minutes = word_count / 150 if word_count > 0 else 0
2791
-
2792
- return {
2793
- 'full_transcript': combined_text,
2794
- 'speaker_tagged_transcript': speaker_formatted_text,
2795
- 'word_count': word_count,
2796
- 'character_count': char_count,
2797
- 'chunk_count': len(transcript_results),
2798
- 'successful_chunks': successful_chunks,
2799
- 'success_rate': success_rate,
2800
- 'average_confidence': avg_confidence,
2801
- 'method_distribution': method_counts,
2802
- 'estimated_duration_minutes': estimated_duration_minutes,
2803
- 'speaker_identification_enabled': enable_speaker_id,
2804
- 'speaker_statistics': speaker_stats,
2805
- 'total_speakers': len([s for s in speaker_stats.keys() if s != 'SPEAKER_UNKNOWN']),
2806
- 'detailed_results': transcript_results
2807
- }
2808
-
2809
  def extract_transcript(self, audio_path: str, video_hash: str, input_data: Dict[str, Any] = None) -> Dict[str, Any]:
2810
  """Extract complete transcript from audio file"""
2811
  cache_enabled = self._get_config('cache_enabled', True, input_data)
2812
- enable_speaker_id = self._get_config('enable_speaker_id', True, input_data)
2813
- cache_suffix = "transcript_with_speakers.json" if enable_speaker_id else "transcript.json"
2814
- cache_path = self._get_cache_path(video_hash, cache_suffix)
2815
 
2816
  # Check cache
2817
  cached_transcript = self._load_from_cache(cache_path, cache_enabled)
@@ -2828,7 +2542,6 @@ class YouTubeTranscriptExtractor(BaseTool):
2828
  return {
2829
  'error': 'Failed to split audio into chunks',
2830
  'full_transcript': '',
2831
- 'speaker_tagged_transcript': '',
2832
  'success_rate': 0.0
2833
  }
2834
 
@@ -2836,17 +2549,31 @@ class YouTubeTranscriptExtractor(BaseTool):
2836
  print(f"Transcribing {len(chunk_data)} audio chunks...")
2837
  transcript_results = self._transcribe_chunks_parallel(chunk_data, input_data)
2838
 
2839
- # Step 3: Post-process and combine results
2840
- print("Post-processing transcript and identifying speakers...")
2841
- final_result = self._post_process_transcript(transcript_results, input_data)
2842
-
2843
- # Add timestamp
2844
- final_result['extraction_timestamp'] = time.time()
2845
- final_result['extraction_date'] = time.strftime('%Y-%m-%d %H:%M:%S')
 
 
 
 
 
 
 
 
 
 
 
 
 
2846
 
2847
  # Cache results
2848
  self._save_to_cache(cache_path, final_result, cache_enabled)
2849
-
 
2850
  return final_result
2851
 
2852
  except Exception as e:
@@ -2854,7 +2581,6 @@ class YouTubeTranscriptExtractor(BaseTool):
2854
  return {
2855
  'error': str(e),
2856
  'full_transcript': '',
2857
- 'speaker_tagged_transcript': '',
2858
  'success_rate': 0.0
2859
  }
2860
 
@@ -2876,7 +2602,7 @@ class YouTubeTranscriptExtractor(BaseTool):
2876
  print(f"Downloading YouTube audio from {youtube_url}...")
2877
  audio_path = self.download_youtube_audio(youtube_url, video_hash, input_data)
2878
  if not audio_path or not os.path.exists(audio_path):
2879
- return "Error: Failed to download the YouTube audio."
2880
 
2881
  # Step 2: Extract transcript
2882
  print("Extracting audio transcript...")
@@ -2885,18 +2611,19 @@ class YouTubeTranscriptExtractor(BaseTool):
2885
  if transcript_result.get("error"):
2886
  return f"Error: {transcript_result['error']}"
2887
 
2888
- # Choose the appropriate transcript
2889
- main_transcript = transcript_result.get('full_transcript')
 
 
2890
 
2891
- #ipdb.set_trace()
2892
- print(f"Transcript extracted: {main_transcript[:50]}..." if len(main_transcript) > 50 else f"Transcript extracted: {main_transcript}")
2893
 
2894
  return "TRANSCRIPT: " + main_transcript
2895
 
2896
  except Exception as e:
2897
  return f"Error during transcript extraction: {str(e)}"
2898
 
2899
-
2900
  # Factory function to create the tool
2901
  def create_youtube_transcript_tool(**kwargs):
2902
  """Factory function to create the transcript extraction tool with custom parameters"""
 
2124
  """Factory function to create the enhanced tool with custom parameters"""
2125
  return EnhancedYoutubeScreenshotQA(**kwargs)
2126
 
2127
+ import os
2128
+ import json
2129
+ import hashlib
2130
+ import time
2131
+ import shutil
2132
+ import glob
2133
+ from typing import Dict, Any, List, Optional
2134
+ from concurrent.futures import ThreadPoolExecutor, as_completed
2135
+ import yt_dlp
2136
+ import speech_recognition as sr
2137
+ from pydantic import Field
2138
+ from pydantic.v1 import BaseModel
2139
+ from pydub import AudioSegment
2140
+ from pydub.silence import split_on_silence
2141
+
2142
+
2143
+ class BaseTool(BaseModel):
2144
+ name: str
2145
+ description: str
2146
+
2147
 
2148
  class YouTubeTranscriptExtractor(BaseTool):
2149
  name: str = "youtube_transcript_extractor"
2150
  description: str = (
2151
+ "Downloads a YouTube video and extracts the complete audio transcript using speech recognition. "
2152
+ "Use this tool for questions about what people say in YouTube videos. "
 
2153
  "Input should be a dict with keys: 'youtube_url' and optional parameters. "
2154
+ "Example: {'youtube_url': 'https://youtube.com/watch?v=xyz', 'language': 'en-US'}"
 
 
 
2155
  )
2156
 
2157
  # Define Pydantic fields for the attributes we need to set
2158
  recognizer: Any = Field(default=None, exclude=True)
2159
 
2160
  class Config:
 
2161
  arbitrary_types_allowed = True
 
2162
  extra = "allow"
2163
 
2164
  def __init__(self, **kwargs):
2165
  super().__init__(**kwargs)
2166
 
2167
  # Initialize directories
2168
+ self.cache_dir = '/tmp/youtube_transcript_cache/'
2169
+ self.audio_dir = '/tmp/audio/'
2170
+ self.chunks_dir = '/tmp/audio_chunks/'
2171
 
2172
  # Initialize speech recognizer
2173
  self.recognizer = sr.Recognizer()
2174
+ self.recognizer.energy_threshold = 4000
2175
+ self.recognizer.pause_threshold = 0.8
2176
 
2177
  # Create directories
2178
+ for dir_path in [self.cache_dir, self.audio_dir, self.chunks_dir]:
2179
  os.makedirs(dir_path, exist_ok=True)
2180
 
2181
  def _get_config(self, key: str, default_value=None, input_data: Dict[str, Any] = None):
 
2184
  'language': 'en-US',
2185
  'chunk_length_ms': 30000, # 30 seconds
2186
  'silence_thresh': -40, # dB
 
2187
  'audio_quality': 'best',
2188
  'cache_enabled': True,
 
 
2189
  'min_silence_len': 500, # minimum silence length to split on
2190
+ 'overlap_ms': 1000, # 1 second overlap between chunks
 
 
 
 
 
 
2191
  }
2192
 
2193
  if input_data and key in input_data:
 
2200
 
2201
  def _get_cache_path(self, video_hash: str, cache_type: str) -> str:
2202
  """Get cache file path"""
2203
+ return os.path.join(self.cache_dir, f"{video_hash}_{cache_type}")
 
2204
 
2205
  def _load_from_cache(self, cache_path: str, cache_enabled: bool = True) -> Optional[Any]:
2206
  """Load data from cache"""
 
2223
  except Exception as e:
2224
  print(f"Error saving cache: {str(e)}")
2225
 
2226
+ def _clean_directory(self, directory: str):
2227
+ """Clean directory contents"""
2228
+ if os.path.exists(directory):
2229
+ for filename in os.listdir(directory):
2230
+ file_path = os.path.join(directory, filename)
2231
+ try:
2232
+ if os.path.isfile(file_path) or os.path.islink(file_path):
2233
+ os.unlink(file_path)
2234
+ elif os.path.isdir(file_path):
2235
+ shutil.rmtree(file_path)
2236
+ except Exception as e:
2237
+ print(f'Failed to delete {file_path}. Reason: {e}')
2238
+
2239
  def download_youtube_audio(self, url: str, video_hash: str, input_data: Dict[str, Any] = None) -> Optional[str]:
2240
  """Download YouTube video as audio file"""
 
2241
  audio_quality = self._get_config('audio_quality', 'best', input_data)
2242
  output_filename = f'{video_hash}.wav'
2243
+ output_path = os.path.join(self.audio_dir, output_filename)
2244
 
2245
  # Check cache
2246
  cache_enabled = self._get_config('cache_enabled', True, input_data)
 
2249
  return output_path
2250
 
2251
  # Clean directory
2252
+ self._clean_directory(self.audio_dir)
2253
 
2254
  try:
2255
+ # Updated yt-dlp configuration for better compatibility
 
 
2256
  ydl_opts = {
2257
+ 'format': 'bestaudio[ext=m4a]/bestaudio/best',
2258
+ 'outtmpl': os.path.join(self.audio_dir, f'{video_hash}.%(ext)s'),
2259
+ 'quiet': False, # Set to False for debugging
2260
+ 'no_warnings': False,
2261
+ 'extract_flat': False,
2262
+ 'writethumbnail': False,
2263
+ 'writeinfojson': False,
2264
+ 'postprocessors': [{
2265
+ 'key': 'FFmpegExtractAudio',
2266
+ 'preferredcodec': 'wav',
2267
+ 'preferredquality': '192' if audio_quality == 'best' else '128',
2268
+ }],
2269
+ # Add user agent and headers to avoid blocking
2270
+ 'http_headers': {
2271
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
2272
+ },
2273
+ # Add cookie handling
2274
+ 'cookiefile': None,
2275
+ 'nocheckcertificate': True,
2276
  }
2277
 
2278
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
2279
+ print(f"Downloading audio from: {url}")
2280
  ydl.download([url])
2281
 
2282
+ # Check if the output file exists
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2283
  if os.path.exists(output_path):
2284
+ print(f"Audio downloaded successfully: {output_path}")
2285
  return output_path
2286
  else:
2287
+ # Look for any downloaded file with the video hash
2288
+ possible_files = glob.glob(os.path.join(self.audio_dir, f'{video_hash}.*'))
2289
+ if possible_files:
2290
+ # Convert to WAV if needed
2291
+ source_file = possible_files[0]
2292
+ if not source_file.endswith('.wav'):
2293
+ try:
2294
+ audio = AudioSegment.from_file(source_file)
2295
+ audio.export(output_path, format="wav")
2296
+ os.remove(source_file) # Clean up original
2297
+ print(f"Audio converted to WAV: {output_path}")
2298
+ return output_path
2299
+ except Exception as e:
2300
+ print(f"Error converting audio: {str(e)}")
2301
+ return None
2302
+ else:
2303
+ return source_file
2304
+
2305
+ print("No audio file found after download")
2306
  return None
2307
 
2308
  except Exception as e:
2309
  print(f"Error downloading YouTube audio: {str(e)}")
2310
+ # Try alternative format as fallback
2311
+ try:
2312
+ print("Trying alternative download method...")
2313
+ fallback_opts = {
2314
+ 'format': 'worst[ext=mp4]',
2315
+ 'outtmpl': os.path.join(self.audio_dir, f'{video_hash}_fallback.%(ext)s'),
2316
+ 'quiet': False,
2317
+ }
2318
+
2319
+ with yt_dlp.YoutubeDL(fallback_opts) as ydl:
2320
+ ydl.download([url])
2321
+
2322
+ # Look for fallback file and convert
2323
+ fallback_files = glob.glob(os.path.join(self.audio_dir, f'{video_hash}_fallback.*'))
2324
+ if fallback_files:
2325
+ source_file = fallback_files[0]
2326
+ try:
2327
+ audio = AudioSegment.from_file(source_file)
2328
+ audio.export(output_path, format="wav")
2329
+ os.remove(source_file)
2330
+ print(f"Fallback audio converted: {output_path}")
2331
+ return output_path
2332
+ except Exception as conv_e:
2333
+ print(f"Error converting fallback audio: {str(conv_e)}")
2334
+
2335
+ except Exception as fallback_e:
2336
+ print(f"Fallback download also failed: {str(fallback_e)}")
2337
+
2338
  return None
2339
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2340
  def _split_audio_intelligent(self, audio_path: str, input_data: Dict[str, Any] = None) -> List[Dict[str, Any]]:
2341
+ """Split audio into chunks intelligently based on silence"""
2342
+ self._clean_directory(self.chunks_dir)
 
2343
 
2344
  try:
2345
  # Load audio
 
2370
 
2371
  # Save chunks and create metadata
2372
  chunk_data = []
2373
+ current_time = 0
2374
+
2375
  for i, chunk in enumerate(chunks):
2376
  if len(chunk) < 1000: # Skip very short chunks
2377
  continue
2378
 
2379
+ chunk_filename = os.path.join(self.chunks_dir, f"chunk_{i:04d}.wav")
2380
  chunk.export(chunk_filename, format="wav")
2381
 
 
 
2382
  duration = len(chunk) / 1000.0 # in seconds
2383
 
2384
  chunk_info = {
2385
  'filename': chunk_filename,
2386
  'index': i,
2387
+ 'start_time': current_time,
2388
  'duration': duration,
2389
+ 'end_time': current_time + duration
2390
  }
2391
 
2392
  chunk_data.append(chunk_info)
2393
+ current_time += duration
2394
 
2395
  print(f"Split audio into {len(chunk_data)} chunks")
2396
  return chunk_data
2397
 
2398
  except Exception as e:
2399
  print(f"Error splitting audio: {str(e)}")
2400
+ # Fallback: return original file as single chunk
2401
+ try:
2402
+ audio = AudioSegment.from_wav(audio_path)
2403
+ duration = len(audio) / 1000.0
2404
+ return [{
2405
+ 'filename': audio_path,
2406
+ 'index': 0,
2407
+ 'start_time': 0,
2408
+ 'duration': duration,
2409
+ 'end_time': duration
2410
+ }]
2411
+ except:
2412
+ return []
2413
 
2414
  def _transcribe_audio_chunk(self, chunk_info: Dict[str, Any], input_data: Dict[str, Any] = None) -> Dict[str, Any]:
2415
  """Transcribe a single audio chunk"""
 
2417
  try:
2418
  language = self._get_config('language', 'en-US', input_data)
2419
 
 
 
 
 
2420
  with sr.AudioFile(chunk_path) as source:
2421
  # Adjust for ambient noise
2422
  self.recognizer.adjust_for_ambient_noise(source, duration=0.5)
2423
  audio_data = self.recognizer.record(source)
2424
 
2425
+ # Try Google Speech Recognition
2426
  try:
2427
  text = self.recognizer.recognize_google(audio_data, language=language)
2428
+ return {
2429
  'text': text,
2430
+ 'confidence': 1.0,
 
 
2431
  'start_time': chunk_info['start_time'],
2432
  'end_time': chunk_info['end_time'],
2433
  'duration': chunk_info['duration'],
2434
+ 'index': chunk_info['index'],
2435
+ 'success': True
2436
  }
2437
 
 
 
 
 
 
 
 
2438
  except sr.UnknownValueError:
2439
+ # Try without language specification
2440
  try:
 
2441
  text = self.recognizer.recognize_google(audio_data)
2442
+ return {
2443
  'text': text,
2444
+ 'confidence': 0.8,
 
 
2445
  'start_time': chunk_info['start_time'],
2446
  'end_time': chunk_info['end_time'],
2447
  'duration': chunk_info['duration'],
2448
+ 'index': chunk_info['index'],
2449
+ 'success': True
2450
  }
 
 
 
 
 
 
 
2451
  except sr.UnknownValueError:
2452
  return {
2453
  'text': '[INAUDIBLE]',
2454
  'confidence': 0.0,
 
 
2455
  'start_time': chunk_info['start_time'],
2456
  'end_time': chunk_info['end_time'],
2457
  'duration': chunk_info['duration'],
2458
  'index': chunk_info['index'],
2459
+ 'success': False
2460
  }
2461
  except sr.RequestError as e:
 
2462
  return {
2463
+ 'text': f'[RECOGNITION_ERROR: {str(e)}]',
2464
  'confidence': 0.0,
 
 
2465
  'start_time': chunk_info['start_time'],
2466
  'end_time': chunk_info['end_time'],
2467
  'duration': chunk_info['duration'],
2468
  'index': chunk_info['index'],
2469
+ 'success': False,
2470
+ 'error': str(e)
2471
  }
2472
 
2473
  except Exception as e:
 
2474
  return {
2475
+ 'text': f'[ERROR: {str(e)}]',
2476
  'confidence': 0.0,
 
 
2477
  'start_time': chunk_info.get('start_time', 0),
2478
  'end_time': chunk_info.get('end_time', 0),
2479
  'duration': chunk_info.get('duration', 0),
2480
  'index': chunk_info.get('index', 0),
2481
+ 'success': False,
2482
+ 'error': str(e)
2483
  }
2484
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2485
  def _transcribe_chunks_parallel(self, chunk_data: List[Dict[str, Any]], input_data: Dict[str, Any] = None) -> List[Dict[str, Any]]:
2486
  """Transcribe audio chunks in parallel"""
2487
  results = []
2488
+
2489
+ # Use fewer workers to avoid API rate limits
2490
+ max_workers = min(3, len(chunk_data))
2491
+
2492
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
2493
+ future_to_chunk = {
2494
+ executor.submit(self._transcribe_audio_chunk, chunk_info, input_data): chunk_info
2495
+ for chunk_info in chunk_data
2496
+ }
 
2497
 
2498
+ for future in as_completed(future_to_chunk):
2499
+ chunk_info = future_to_chunk[future]
2500
+ try:
2501
+ result = future.result()
2502
+ results.append(result)
2503
+ if result['success']:
2504
+ preview = result['text'][:50] + "..." if len(result['text']) > 50 else result['text']
2505
+ print(f"Transcribed chunk {result['index']}: {preview}")
2506
+ else:
2507
+ print(f"Failed to transcribe chunk {result['index']}: {result['text']}")
2508
+ except Exception as e:
2509
+ print(f"Error processing chunk {chunk_info.get('index', '?')}: {str(e)}")
2510
+ results.append({
2511
+ 'text': f'[PROCESSING_ERROR: {str(e)}]',
2512
+ 'confidence': 0.0,
2513
+ 'start_time': chunk_info.get('start_time', 0),
2514
+ 'end_time': chunk_info.get('end_time', 0),
2515
+ 'duration': chunk_info.get('duration', 0),
2516
+ 'index': chunk_info.get('index', 0),
2517
+ 'success': False,
2518
+ 'error': str(e)
2519
+ })
 
 
 
2520
 
2521
  # Sort results by chunk index to maintain order
2522
  results.sort(key=lambda x: x['index'])
2523
  return results
2524
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2525
  def extract_transcript(self, audio_path: str, video_hash: str, input_data: Dict[str, Any] = None) -> Dict[str, Any]:
2526
  """Extract complete transcript from audio file"""
2527
  cache_enabled = self._get_config('cache_enabled', True, input_data)
2528
+ cache_path = self._get_cache_path(video_hash, "transcript.json")
 
 
2529
 
2530
  # Check cache
2531
  cached_transcript = self._load_from_cache(cache_path, cache_enabled)
 
2542
  return {
2543
  'error': 'Failed to split audio into chunks',
2544
  'full_transcript': '',
 
2545
  'success_rate': 0.0
2546
  }
2547
 
 
2549
  print(f"Transcribing {len(chunk_data)} audio chunks...")
2550
  transcript_results = self._transcribe_chunks_parallel(chunk_data, input_data)
2551
 
2552
+ # Step 3: Combine results
2553
+ successful_results = [r for r in transcript_results if r['success']]
2554
+ full_text = ' '.join([r['text'] for r in successful_results])
2555
+
2556
+ # Calculate statistics
2557
+ total_chunks = len(transcript_results)
2558
+ successful_chunks = len(successful_results)
2559
+ success_rate = successful_chunks / total_chunks if total_chunks > 0 else 0
2560
+ word_count = len(full_text.split()) if full_text else 0
2561
+
2562
+ final_result = {
2563
+ 'full_transcript': full_text,
2564
+ 'word_count': word_count,
2565
+ 'total_chunks': total_chunks,
2566
+ 'successful_chunks': successful_chunks,
2567
+ 'success_rate': success_rate,
2568
+ 'extraction_timestamp': time.time(),
2569
+ 'extraction_date': time.strftime('%Y-%m-%d %H:%M:%S'),
2570
+ 'detailed_results': transcript_results
2571
+ }
2572
 
2573
  # Cache results
2574
  self._save_to_cache(cache_path, final_result, cache_enabled)
2575
+
2576
+ print(f"Transcript extraction completed. Success rate: {success_rate:.1%}")
2577
  return final_result
2578
 
2579
  except Exception as e:
 
2581
  return {
2582
  'error': str(e),
2583
  'full_transcript': '',
 
2584
  'success_rate': 0.0
2585
  }
2586
 
 
2602
  print(f"Downloading YouTube audio from {youtube_url}...")
2603
  audio_path = self.download_youtube_audio(youtube_url, video_hash, input_data)
2604
  if not audio_path or not os.path.exists(audio_path):
2605
+ return "Error: Failed to download the YouTube audio. Please check the URL and try again."
2606
 
2607
  # Step 2: Extract transcript
2608
  print("Extracting audio transcript...")
 
2611
  if transcript_result.get("error"):
2612
  return f"Error: {transcript_result['error']}"
2613
 
2614
+ main_transcript = transcript_result.get('full_transcript', '')
2615
+
2616
+ if not main_transcript:
2617
+ return "Error: No transcript could be extracted from the audio."
2618
 
2619
+ print(f"Transcript extracted successfully. Word count: {transcript_result.get('word_count', 0)}")
2620
+ print(f"Success rate: {transcript_result.get('success_rate', 0):.1%}")
2621
 
2622
  return "TRANSCRIPT: " + main_transcript
2623
 
2624
  except Exception as e:
2625
  return f"Error during transcript extraction: {str(e)}"
2626
 
 
2627
  # Factory function to create the tool
2628
  def create_youtube_transcript_tool(**kwargs):
2629
  """Factory function to create the transcript extraction tool with custom parameters"""