Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import subprocess | |
import uuid | |
import glob | |
import shutil | |
import gradio as gr | |
os.system("chmod +x fdkaac") # Ensure fdkaac is executable | |
accel = 'auto' | |
video_base_opts = ['-crf', '63', '-c:v', 'libx264', '-tune', 'zerolatency'] | |
UPLOAD_FOLDER = 'uploads' | |
CONVERTED_FOLDER = 'converted' | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(CONVERTED_FOLDER, exist_ok=True) | |
async def run_subprocess(cmd, use_fdkaac=False): | |
env = os.environ.copy() | |
if use_fdkaac: | |
env["LD_LIBRARY_PATH"] = os.path.abspath("./") + ":" + env.get("LD_LIBRARY_PATH", "") | |
process = await asyncio.create_subprocess_exec( | |
*cmd, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
env=env | |
) | |
stdout, stderr = await process.communicate() | |
if process.returncode != 0: | |
raise subprocess.CalledProcessError(process.returncode, cmd, stderr.decode()) | |
return stdout.decode(), stderr.decode() | |
async def convert_video_task(input_path, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate, progress): | |
if use_mp3: | |
progress.update(1, desc="Converting audio to MP3...") | |
output_audio = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp3") | |
cmd = [ | |
'ffmpeg', '-y', '-i', input_path, '-vn', | |
'-c:a', 'libmp3lame', '-b:a', '8k', '-ar', '24000', '-ac', '1', | |
output_audio | |
] | |
await run_subprocess(cmd) | |
if audio_only: | |
return output_audio, None | |
# embed into video | |
progress.update(1, desc="Making the video quality lower...") | |
output_video = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp4") | |
cmd = ['ffmpeg', '-y', '-hwaccel', accel, '-i', input_path] | |
if custom_bitrate: | |
cmd += ['-b:v', f"{int(video_bitrate)}k"] | |
else: | |
cmd += video_base_opts | |
if faster: | |
cmd += ['-preset', 'ultrafast'] | |
cmd += ['-c:a', 'libmp3lame', '-b:a', '8k', '-ar', '24000', '-ac', '1', output_video] | |
await run_subprocess(cmd) | |
return None, output_video | |
# split | |
progress.update(1, desc="Splitting Audio and Video...") | |
audio_wav = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.wav") | |
await run_subprocess(['ffmpeg','-y','-i',input_path,'-ac','1','-ar','8000',audio_wav]) | |
# AAC encode | |
progress.update(1, desc="Converting audio to AAC which is lower quality...") | |
audio_output = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.m4a") | |
await run_subprocess([ './fdkaac','-b','1k','-C','-f','2','-G','1','-w','8000','-o',audio_output,audio_wav ], use_fdkaac=True) | |
# video | |
progress.update(1, desc="Making the video quality lower...") | |
video_output = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp4") | |
cmd = ['ffmpeg','-y','-hwaccel',accel,'-i',input_path] | |
if downscale: | |
cmd += ['-vf','scale=-2:144'] | |
if custom_bitrate: | |
cmd += ['-b:v',f"{int(video_bitrate)}k"] | |
else: | |
cmd += video_base_opts | |
if faster: | |
cmd += ['-preset','ultrafast'] | |
cmd += ['-an',video_output] | |
await run_subprocess(cmd) | |
if audio_only: | |
return audio_output, None | |
# merge | |
progress.update(1, desc="Mixing audio and video together...") | |
merged = os.path.join(CONVERTED_FOLDER, f"{uuid.uuid4()}.mp4") | |
await run_subprocess(['ffmpeg','-y','-i',video_output,'-i',audio_output,'-c','copy',merged]) | |
# cleanup | |
for f in (audio_wav, audio_output, video_output): | |
try: os.remove(f) | |
except: pass | |
return None, merged | |
async def process_conversion(use_youtube, youtube_url, video_file, downscale, faster, | |
use_mp3, audio_only, custom_bitrate, video_bitrate, | |
progress=gr.Progress()): | |
with progress.tqdm(total=5) as pbar: | |
# Step 1: fetch | |
if use_youtube: | |
pbar.update(1, desc="Downloading video...") | |
yt_uuid = uuid.uuid4().hex | |
out = os.path.join(UPLOAD_FOLDER, yt_uuid + ".%(ext)s") | |
await run_subprocess(['yt-dlp','-o',out,'-f','b',youtube_url]) | |
files = glob.glob(os.path.join(UPLOAD_FOLDER, yt_uuid + ".*")) | |
if not files: return "Download failed.", None | |
input_path = files[0] | |
else: | |
pbar.update(1, desc="Preparing input video...") | |
fname = video_file.name | |
ext = os.path.splitext(fname)[1] | |
input_path = os.path.join(UPLOAD_FOLDER, uuid.uuid4().hex + ext) | |
shutil.copy2(fname, input_path) | |
# Step 2β5: convert | |
if use_mp3: | |
pbar.update(1, desc="Converting audio to MP3...") | |
else: | |
pbar.update(1, desc="Converting audio to AAC which is lower quality...") | |
audio_out, video_out = await convert_video_task( | |
input_path, downscale, faster, use_mp3, audio_only, | |
custom_bitrate, video_bitrate, pbar | |
) | |
# final update | |
pbar.update(1, desc="Finalizing...") | |
if audio_only: | |
return audio_out, audio_out | |
return video_out, video_out | |
# Build UI | |
with gr.Blocks(theme=gr.themes.Default(primary_hue="rose")) as demo: | |
gr.Markdown("# Low Quality Video Inator") | |
with gr.Row(): | |
use_youtube = gr.Checkbox(label="Use YouTube URL", value=False) | |
youtube_url = gr.Textbox(label="YouTube URL") | |
video_file = gr.File(label="Upload Video") | |
with gr.Row(): | |
downscale = gr.Checkbox("Downscale Video to 144p", value=False) | |
faster = gr.Checkbox("Faster Compression", value=False) | |
with gr.Row(): | |
use_mp3 = gr.Checkbox("Use MP3 Audio", value=False) | |
audio_only = gr.Checkbox("Audio Only", value=False) | |
with gr.Row(): | |
custom_bitrate = gr.Checkbox("Custom Video Bitrate", value=False) | |
video_bitrate = gr.Number(label="Bitrate (kbps)", visible=False) | |
custom_bitrate.change(lambda x: gr.update(visible=x), custom_bitrate, video_bitrate) | |
convert_button = gr.Button("Convert Now", variant="primary") | |
video_preview = gr.Video() | |
file_download = gr.File() | |
convert_button.click( | |
process_conversion, | |
inputs=[use_youtube, youtube_url, video_file, | |
downscale, faster, use_mp3, audio_only, | |
custom_bitrate, video_bitrate], | |
outputs=[video_preview, file_download] | |
) | |
demo.launch() |