Spaces:
Build error
Build error
| import gradio as gr | |
| from gradio import utils | |
| import os | |
| import re | |
| import requests | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| from yt_dlp import YoutubeDL | |
| import subprocess | |
| import shutil | |
| from typing import List, Tuple | |
| import pandas as pd | |
| def sanitize_title(title): | |
| return re.sub(r'[\\/*?:"<>|]', "", title) | |
| def format_time(seconds): | |
| return time.strftime('%H:%M:%S', time.gmtime(seconds)) | |
| def get_video_info(video_url): | |
| with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: | |
| try: | |
| info = ydl.extract_info(video_url, download=False) | |
| formats = info.get('formats', []) | |
| # Function to safely get bitrate | |
| def get_bitrate(format_dict, key): | |
| return format_dict.get(key, 0) or 0 | |
| # Prefer adaptive formats (separate video and audio) | |
| video_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') == 'none'] | |
| audio_formats = [f for f in formats if f.get('acodec') != 'none' and f.get('vcodec') == 'none'] | |
| if video_formats and audio_formats: | |
| video_format = max(video_formats, key=lambda f: get_bitrate(f, 'vbr')) | |
| audio_format = max(audio_formats, key=lambda f: get_bitrate(f, 'abr')) | |
| return info['title'], video_format['url'], audio_format['url'] | |
| else: | |
| # Fallback to best combined format | |
| combined_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') != 'none'] | |
| if combined_formats: | |
| best_format = max(combined_formats, key=lambda f: get_bitrate(f, 'tbr')) | |
| return info['title'], best_format['url'], None | |
| else: | |
| raise Exception("No suitable video formats found") | |
| except Exception as e: | |
| raise Exception(f"Error extracting video info: {str(e)}") | |
| def download_segment(url, start_time, end_time, output_path): | |
| command = [ | |
| 'ffmpeg', | |
| '-ss', format_time(start_time), | |
| '-i', url, | |
| '-t', format_time(end_time - start_time), | |
| '-c', 'copy', | |
| '-y', | |
| output_path | |
| ] | |
| process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) | |
| while True: | |
| output = process.stderr.readline() | |
| if output == '' and process.poll() is not None: | |
| break | |
| if output: | |
| yield output.strip() | |
| rc = process.poll() | |
| return rc == 0 | |
| def combine_segments(video_segments, audio_segments, output_path): | |
| temp_video = 'temp_video.mp4' | |
| temp_audio = 'temp_audio.m4a' | |
| # Concatenate video segments | |
| with open('video_list.txt', 'w') as f: | |
| for segment in video_segments: | |
| f.write(f"file '{segment}'\n") | |
| subprocess.run(['ffmpeg', '-f', 'concat', '-safe', '0', '-i', 'video_list.txt', '-c', 'copy', temp_video]) | |
| # Concatenate audio segments if they exist | |
| if audio_segments: | |
| with open('audio_list.txt', 'w') as f: | |
| for segment in audio_segments: | |
| f.write(f"file '{segment}'\n") | |
| subprocess.run(['ffmpeg', '-f', 'concat', '-safe', '0', '-i', 'audio_list.txt', '-c', 'copy', temp_audio]) | |
| # Combine video and audio | |
| subprocess.run(['ffmpeg', '-i', temp_video, '-i', temp_audio, '-c', 'copy', output_path]) | |
| else: | |
| shutil.move(temp_video, output_path) | |
| # Clean up temporary files | |
| os.remove('video_list.txt') | |
| if os.path.exists('audio_list.txt'): | |
| os.remove('audio_list.txt') | |
| if os.path.exists(temp_video): | |
| os.remove(temp_video) | |
| if os.path.exists(temp_audio): | |
| os.remove(temp_audio) | |
| def add_segment(start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments): | |
| start_time = f"{start_hours:02d}:{start_minutes:02d}:{start_seconds:02d}" | |
| end_time = f"{end_hours:02d}:{end_minutes:02d}:{end_seconds:02d}" | |
| new_segment = f"{start_time}-{end_time}" | |
| new_row = pd.DataFrame([new_segment], columns=["Segment"]) | |
| return pd.concat([segments, new_row], ignore_index=True) | |
| def remove_segment(segments, index): | |
| return segments.drop(index).reset_index(drop=True) | |
| def move_segment(segments, old_index, new_index): | |
| if 0 <= old_index < len(segments) and 0 <= new_index < len(segments): | |
| segment = segments.iloc[old_index] | |
| segments = segments.drop(old_index).reset_index(drop=True) | |
| segments = pd.concat([segments.iloc[:new_index], pd.DataFrame([segment]), segments.iloc[new_index:]]).reset_index(drop=True) | |
| return segments | |
| def parse_segments(segments: pd.DataFrame) -> List[Tuple[int, int]]: | |
| parsed_segments = [] | |
| for segment in segments['Segment']: | |
| if not isinstance(segment, str) or '-' not in segment: | |
| continue | |
| try: | |
| start, end = segment.split('-') | |
| start_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(start.split(':'))) if i) | |
| end_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(end.split(':'))) if i) | |
| if start_seconds < end_seconds: | |
| parsed_segments.append((start_seconds, end_seconds)) | |
| except ValueError: | |
| continue # Skip invalid segments | |
| return parsed_segments | |
| def process_video(video_url, segments, combine, progress=gr.Progress()): | |
| if not video_url.strip(): | |
| return 0, "Error: Please provide a valid YouTube URL", None | |
| # Extract segments from the Dataframe | |
| parsed_segments = parse_segments(segments) | |
| if not parsed_segments: | |
| return 0, "Error: No valid segments provided", None | |
| output_dir = 'output' | |
| os.makedirs(output_dir, exist_ok=True) | |
| try: | |
| video_title, video_url, audio_url = get_video_info(video_url) | |
| except Exception as e: | |
| return 0, f"Error: {str(e)}", None | |
| video_segments = [] | |
| audio_segments = [] | |
| total_segments = len(parsed_segments) | |
| for i, (start_time, end_time) in enumerate(parsed_segments): | |
| video_output = os.path.join(output_dir, f"{sanitize_title(video_title)}_video_segment_{i+1}.mp4") | |
| for output in download_segment(video_url, start_time, end_time, video_output): | |
| progress((i / total_segments) + (1 / total_segments) * 0.5) | |
| yield i * 100 // total_segments, f"Downloading video segment {i+1}/{total_segments}: {output}", None | |
| video_segments.append(video_output) | |
| if audio_url: | |
| audio_output = os.path.join(output_dir, f"{sanitize_title(video_title)}_audio_segment_{i+1}.m4a") | |
| for output in download_segment(audio_url, start_time, end_time, audio_output): | |
| progress((i / total_segments) + (1 / total_segments) * 0.75) | |
| yield i * 100 // total_segments + 50, f"Downloading audio segment {i+1}/{total_segments}: {output}", None | |
| audio_segments.append(audio_output) | |
| if combine: | |
| output_path = os.path.join(output_dir, f"{sanitize_title(video_title)}_combined.mp4") | |
| combine_segments(video_segments, audio_segments, output_path) | |
| yield 100, f"Segments combined and saved as {output_path}", output_path | |
| else: | |
| # If not combining, return the first video segment (you might want to modify this behavior) | |
| output_path = video_segments[0] if video_segments else None | |
| yield 100, "All segments downloaded successfully", output_path | |
| # Clean up individual segments if combined | |
| if combine: | |
| for segment in video_segments + audio_segments: | |
| os.remove(segment) | |
| # Disable Gradio analytics | |
| utils.colab_check = lambda: True | |
| with gr.Blocks(title="Advanced YouTube Segment Downloader", theme=gr.themes.Soft()) as iface: | |
| gr.Markdown("## Advanced YouTube Segment Downloader") | |
| gr.Markdown("Download segments of YouTube videos using adaptive streaming and ffmpeg, with optional combining.") | |
| with gr.Row(): | |
| video_url = gr.Textbox(label="YouTube URL", placeholder="Enter YouTube URL here") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| start_hours = gr.Number(label="Start Hours", minimum=0, maximum=23, step=1, value=0) | |
| start_minutes = gr.Number(label="Start Minutes", minimum=0, maximum=59, step=1, value=0) | |
| start_seconds = gr.Number(label="Start Seconds", minimum=0, maximum=59, step=1, value=0) | |
| with gr.Row(): | |
| end_hours = gr.Number(label="End Hours", minimum=0, maximum=23, step=1, value=0) | |
| end_minutes = gr.Number(label="End Minutes", minimum=0, maximum=59, step=1, value=0) | |
| end_seconds = gr.Number(label="End Seconds", minimum=0, maximum=59, step=1, value=0) | |
| add_btn = gr.Button("Add Segment") | |
| with gr.Column(scale=2): | |
| segments = gr.Dataframe( | |
| headers=["Segment"], | |
| row_count=5, | |
| col_count=1, | |
| datatype=["str"], | |
| interactive=True, | |
| label="Segments" | |
| ) | |
| combine = gr.Checkbox(label="Combine Segments") | |
| submit_btn = gr.Button("Download Segments", variant="primary") | |
| progress = gr.Slider(label="Progress", minimum=0, maximum=100, step=1) | |
| status = gr.Textbox(label="Status", lines=10) | |
| output_file = gr.File(label="Download Video") | |
| add_btn.click( | |
| add_segment, | |
| inputs=[start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments], | |
| outputs=[segments] | |
| ) | |
| submit_btn.click( | |
| process_video, | |
| inputs=[video_url, segments, combine], | |
| outputs=[progress, status, output_file] | |
| ) | |
| segments.change( | |
| move_segment, | |
| inputs=[segments, gr.Slider(0, 100, step=1, label="Old Index"), gr.Slider(0, 100, step=1, label="New Index")], | |
| outputs=[segments] | |
| ) | |
| remove_btn = gr.Button("Remove Selected Segment") | |
| remove_btn.click( | |
| remove_segment, | |
| inputs=[segments, gr.Slider(0, 100, step=1, label="Index to Remove")], | |
| outputs=[segments] | |
| ) | |
| iface.launch() |