Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import subprocess | |
import uuid | |
import glob | |
import shutil | |
import gradio as gr | |
os.system("chmod +x fdkaac") # Ensure fdkaac is executable | |
accel = 'auto' | |
video_base_opts = ['-crf', '63', '-c:v', 'libx264', '-tune', 'zerolatency'] | |
UPLOAD_FOLDER = 'uploads' | |
CONVERTED_FOLDER = 'converted' | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(CONVERTED_FOLDER, exist_ok=True) | |
async def run_subprocess(cmd, use_fdkaac=False): | |
env = os.environ.copy() | |
if use_fdkaac: | |
env["LD_LIBRARY_PATH"] = os.path.abspath("./") + ":" + env.get("LD_LIBRARY_PATH", "") | |
print(f"[DEBUG] Running command:\n{' '.join(cmd)}\n") | |
process = await asyncio.create_subprocess_exec( | |
*cmd, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
env=env | |
) | |
stdout, stderr = await process.communicate() | |
if process.returncode != 0: | |
print(f"[ERROR] Command failed:\n{stderr.decode()}\n") | |
raise subprocess.CalledProcessError(process.returncode, cmd, stderr.decode()) | |
print(f"[DEBUG] Command succeeded:\n{stdout.decode()}\n") | |
return stdout.decode(), stderr.decode() | |
async def convert_video_task(input_path, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate): | |
if use_mp3: | |
output_audio = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp3") | |
ffmpeg_audio_cmd = [ | |
'ffmpeg', '-y', '-i', input_path, '-vn', | |
'-c:a', 'libmp3lame', '-b:a', '8k', '-ar', '24000', '-ac', '1', | |
output_audio | |
] | |
if audio_only: | |
# Audio-only MP3 | |
await run_subprocess(ffmpeg_audio_cmd) | |
return output_audio, None | |
else: | |
# Video+MP3, no splitting; encode video with MP3 audio embedded | |
output_video = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp4") | |
ffmpeg_video_cmd = [ | |
'ffmpeg', '-y', '-hwaccel', accel, '-i', input_path | |
] | |
if custom_bitrate: | |
ffmpeg_video_cmd += ['-b:v', f"{int(video_bitrate)}k"] | |
else: | |
ffmpeg_video_cmd += video_base_opts | |
if faster: | |
ffmpeg_video_cmd.extend(['-preset', 'ultrafast']) | |
ffmpeg_video_cmd += [ | |
'-c:a', 'libmp3lame', '-b:a', '8k', '-ar', '24000', '-ac', '1', | |
output_video | |
] | |
await run_subprocess(ffmpeg_video_cmd) | |
return None, output_video | |
# fdkaac path: Split, compress audio, merge | |
audio_wav = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.wav") | |
audio_output = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.m4a") | |
video_output = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp4") | |
# Extract mono WAV audio | |
await run_subprocess([ | |
'ffmpeg', '-y', '-i', input_path, '-ac', '1', '-ar', '8000', audio_wav | |
]) | |
# Compress WAV with fdkaac | |
await run_subprocess([ | |
'./fdkaac', '-b', '1k', '-C', '-f', '2', '-G', '1', '-w', '8000', | |
'-o', audio_output, audio_wav | |
], use_fdkaac=True) | |
# Compress video without audio | |
video_cmd = [ | |
'ffmpeg', '-y', '-hwaccel', accel, '-i', input_path | |
] | |
if custom_bitrate: | |
video_cmd += ['-b:v', f"{int(video_bitrate)}k"] | |
else: | |
video_cmd += video_base_opts | |
if faster: | |
video_cmd.extend(['-preset', 'ultrafast']) | |
video_cmd += ['-an', video_output] | |
await run_subprocess(video_cmd) | |
if audio_only: | |
return audio_output, None | |
# Merge video + audio | |
merged_output = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp4") | |
await run_subprocess([ | |
'ffmpeg', '-y', '-i', video_output, '-i', audio_output, '-c', 'copy', merged_output | |
]) | |
# Cleanup intermediates | |
for f in [audio_wav, audio_output, video_output]: | |
try: | |
os.remove(f) | |
except FileNotFoundError: | |
pass | |
return None, merged_output | |
async def process_conversion(use_youtube, youtube_url, video_file, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate): | |
try: | |
if use_youtube: | |
if not youtube_url: | |
return "Error: YouTube URL required.", None | |
yt_uuid = str(uuid.uuid4()) | |
yt_out = os.path.join(UPLOAD_FOLDER, yt_uuid + ".%(ext)s") | |
yt_cmd = ['yt-dlp', '-o', yt_out, '-f', 'b', youtube_url] | |
await run_subprocess(yt_cmd) | |
pattern = os.path.join(UPLOAD_FOLDER, yt_uuid + ".*") | |
files = glob.glob(pattern) | |
if not files: | |
return "Download failed.", None | |
input_path = files[0] | |
else: | |
if not video_file: | |
return "No video provided.", None | |
ext = os.path.splitext(video_file.name)[1] | |
input_path = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}{ext}") | |
shutil.copy2(video_file.name, input_path) | |
audio_out, video_out = await convert_video_task( | |
input_path, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate | |
) | |
if audio_only: | |
return audio_out, audio_out | |
return video_out, video_out | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
def convert_video(*args): | |
return asyncio.run(process_conversion(*args)) | |
# Gradio Interface (Now With Custom Bitrate) | |
with gr.Blocks(theme=gr.themes.Default(primary_hue="rose")) as demo: | |
gr.Markdown(""" | |
# π₯ **Low Quality Video Inator** | |
Welcome to your personal bad-video-making machine. | |
Upload a video or paste a YouTube URL below, then tweak the settings to ruin your video (on purpose). | |
""") | |
with gr.Group(): | |
with gr.Row(): | |
use_youtube = gr.Checkbox(label="π Use YouTube URL (experimental)", value=False) | |
youtube_url = gr.Textbox(label="YouTube URL", placeholder="Paste YouTube URL here") | |
video_file = gr.File(label="π Upload Video File") | |
gr.Markdown("### βοΈ **Conversion Settings**") | |
with gr.Group(): | |
with gr.Row(): | |
downscale = gr.Checkbox(label="Downscale Video to 144p", value=False) | |
faster = gr.Checkbox(label="Faster Video Compression", value=False) | |
with gr.Row(): | |
use_mp3 = gr.Checkbox(label="Use MP3 Audio", value=False) | |
audio_only = gr.Checkbox(label="Audio Only", value=False) | |
with gr.Row(): | |
custom_bitrate = gr.Checkbox(label="Custom Video Bitrate", value=False) | |
video_bitrate = gr.Number(label="Bitrate (kbps)", visible=False) | |
# Show/hide bitrate field | |
custom_bitrate.change( | |
lambda checked: gr.update(visible=checked), | |
inputs=[custom_bitrate], | |
outputs=[video_bitrate] | |
) | |
convert_button = gr.Button("Convert Now", variant="primary") | |
gr.Markdown("### **Conversion Preview**") | |
video_preview = gr.Video(label="Preview Output (if applicable)") | |
gr.Markdown("### **Download Your Masterpiece**") | |
file_download = gr.File(label="Download Result") | |
convert_button.click( | |
convert_video, | |
inputs=[ | |
use_youtube, youtube_url, video_file, | |
downscale, faster, use_mp3, audio_only, | |
custom_bitrate, video_bitrate | |
], | |
outputs=[video_preview, file_download] | |
) | |
demo.launch() |