RandomPersonRR's picture
Update app.py
2a111b0 verified
raw
history blame
18.8 kB
# app.py
import asyncio
import os
import re
import shutil
import uuid
import html
from pathlib import Path
from typing import Optional, Tuple
import gradio as gr
# -------------------------
# Config
# -------------------------
BASE_DIR = Path(".")
UPLOAD_FOLDER = BASE_DIR / "uploads"
CONVERTED_FOLDER = BASE_DIR / "converted"
UPLOAD_FOLDER.mkdir(exist_ok=True)
CONVERTED_FOLDER.mkdir(exist_ok=True)
ALLOWED_EXTENSIONS = {
".3g2", ".3gp", ".3gpp", ".avi", ".cavs", ".dv", ".dvr", ".flv",
".m2ts", ".m4v", ".mkv", ".mod", ".mov", ".mp4", ".mpeg", ".mpg",
".mts", ".mxf", ".ogg", ".rm", ".rmvb", ".swf", ".ts", ".vob",
".webm", ".wmv", ".wtv", ".ogv", ".opus", ".aac", ".ac3", ".aif",
".aifc", ".aiff", ".amr", ".au", ".caf", ".dss", ".flac", ".m4a",
".m4b", ".mp3", ".oga", ".voc", ".wav", ".weba", ".wma"
}
VIDEO_BASE_OPTS = ["-crf", "63", "-c:v", "libx264", "-tune", "zerolatency"]
ACCEL = "auto"
FFMPEG_TIME_RE = re.compile(r"time=(\d+):(\d+):(\d+\.\d+)")
# Ensure fdkaac executable bit if it exists in cwd
FDKAAC_PATH = Path("./fdkaac")
try:
if FDKAAC_PATH.exists():
FDKAAC_PATH.chmod(FDKAAC_PATH.stat().st_mode | 0o111)
except Exception:
pass
# -------------------------
# Helpers
# -------------------------
def is_audio_file(path: str) -> bool:
ext = Path(path).suffix.lower()
return ext in {".mp3", ".m4a", ".wav", ".aac", ".oga", ".ogg"}
def preview_html(percent: float, step: str, media_src: Optional[str] = None) -> str:
"""Return HTML containing media preview (video or audio) with overlayed progress/step."""
pct = max(0.0, min(100.0, percent))
esc_step = html.escape(str(step))
media_tag = ""
if media_src:
esc_src = html.escape(str(media_src))
if is_audio_file(media_src):
media_tag = f'<audio src="{esc_src}" controls style="width:100%;display:block;"></audio>'
else:
media_tag = f'<video src="{esc_src}" controls style="width:100%;height:auto;display:block;"></video>'
else:
media_tag = (
'<div style="width:100%;height:320px;display:flex;align-items:center;justify-content:center;'
'background:#0b0b0b;color:#fff;font-size:16px;">Preview will appear here once available</div>'
)
return f'''
<div style="position:relative;border-radius:8px;overflow:hidden;border:1px solid #ddd;">
{media_tag}
<div style="position:absolute;left:8px;right:8px;bottom:12px;pointer-events:none;">
<div style="background:rgba(0,0,0,0.52);padding:10px;border-radius:8px;color:#fff;font-family:system-ui,Segoe UI,Roboto,Arial;">
<div style="font-size:13px;margin-bottom:8px;"><strong>{esc_step}</strong></div>
<div style="height:12px;background:#222;border-radius:6px;overflow:hidden;">
<div style="width:{pct:.2f}%;height:100%;background:linear-gradient(90deg,#21d4fd,#b721ff);"></div>
</div>
<div style="font-size:12px;margin-top:6px;color:#ddd;">{pct:.1f}%</div>
</div>
</div>
</div>
'''
async def run_command_capture(cmd, cwd=None, env=None) -> Tuple[str, str, int]:
"""Run command to completion, capture stdout/stderr."""
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=cwd,
env=env or os.environ.copy(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return stdout.decode(errors="ignore"), stderr.decode(errors="ignore"), proc.returncode
async def get_duration_seconds(path: Path) -> Optional[float]:
"""Get duration using ffprobe (seconds) or None if unknown."""
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(path)
]
stdout, stderr, rc = await run_command_capture(cmd)
if rc != 0 or not stdout:
return None
try:
return float(stdout.strip())
except Exception:
return None
def which_cmd(name: str) -> Optional[str]:
return shutil.which(name)
# -------------------------
# Converter generator
# streams tuples: (preview_html_str, download_path_or_None)
# -------------------------
async def convert_stream(
use_youtube: bool,
youtube_url: str,
video_file, # gr.File
downscale: bool,
faster: bool,
use_mp3: bool,
audio_only: bool,
custom_bitrate: bool,
video_bitrate: float
):
# initial
yield preview_html(0.0, "Starting..."), None
temp_files = []
input_path: Optional[Path] = None
try:
# SOURCE
if use_youtube:
if not youtube_url:
yield preview_html(0.0, "Error: YouTube URL required."), None
return
if not which_cmd("yt-dlp"):
yield preview_html(0.0, "yt-dlp not found on server. Please upload the file manually."), None
return
yield preview_html(1.0, "Attempting YouTube download..."), None
out_uuid = uuid.uuid4().hex
out_template = str(UPLOAD_FOLDER / f"{out_uuid}.%(ext)s")
ytdlp_cmd = ["yt-dlp", "-f", "b", "-o", out_template, youtube_url]
stdout, stderr, rc = await run_command_capture(ytdlp_cmd)
combined = (stdout or "") + "\n" + (stderr or "")
if rc != 0:
if "Video unavailable" in combined or "This video is unavailable" in combined:
yield preview_html(0.0, "Video unavailable (removed/private/age-restricted)."), None
return
yield preview_html(0.0, "Could not download from YouTube from this server (cloud host blocked). Please upload manually."), None
return
files = list(UPLOAD_FOLDER.glob(f"{out_uuid}.*"))
if not files:
yield preview_html(0.0, "Download completed but file not found."), None
return
input_path = files[0]
temp_files.append(input_path)
else:
if not video_file:
yield preview_html(0.0, "No video provided. Upload or use YouTube URL."), None
return
try:
ext = Path(video_file.name).suffix.lower()
except Exception:
ext = None
if ext not in ALLOWED_EXTENSIONS:
yield preview_html(0.0, f"Unsupported file type: {ext}"), None
return
input_path = UPLOAD_FOLDER / f"{uuid.uuid4().hex}{ext}"
shutil.copy2(video_file.name, input_path)
temp_files.append(input_path)
yield preview_html(1.0, "Probing duration..."), None
total_seconds = await get_duration_seconds(input_path)
if not total_seconds:
yield preview_html(0.0, "Warning: duration unknown; progress will be step-based."), None
# AUDIO-ONLY PATH
if audio_only:
# MP3 branch
if use_mp3:
step = "Converting to MP3..."
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp3"
ffmpeg_cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-ac", "1", "-ar", "24000", "-b:a", "8k", str(out_audio)
]
yield preview_html(2.0, step), None
if total_seconds:
proc = await asyncio.create_subprocess_exec(*ffmpeg_cmd, stderr=asyncio.subprocess.PIPE)
last = 0.0
while True:
line = await proc.stderr.readline()
if not line:
break
txt = line.decode(errors="ignore")
m = FFMPEG_TIME_RE.search(txt)
if m:
hh, mm, ss = m.groups()
current = int(hh) * 3600 + int(mm) * 60 + float(ss)
pct = (current / total_seconds) * 100.0
if pct - last >= 0.5:
last = pct
yield preview_html(pct, step), None
await proc.wait()
if proc.returncode != 0:
yield preview_html(0.0, "ffmpeg failed while encoding MP3."), None
return
else:
stdout, stderr, rc = await run_command_capture(ffmpeg_cmd)
if rc != 0:
yield preview_html(0.0, "ffmpeg failed while encoding MP3."), None
return
yield preview_html(100.0, "MP3 conversion finished.", media_src=str(out_audio)), str(out_audio)
return
# AAC via fdkaac branch (audio-only)
# Ensure fdkaac exists
if not FDKAAC_PATH.exists():
yield preview_html(0.0, "fdkaac not found at ./fdkaac — please add it to the app folder and make executable."), None
return
step = "Preparing WAV for fdkaac..."
yield preview_html(2.0, step), None
wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav"
aac_out = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a"
# Generate WAV (low sample rate)
ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)]
stdout, stderr, rc = await run_command_capture(ffmpeg_wav_cmd)
if rc != 0:
yield preview_html(0.0, "ffmpeg failed to produce WAV for fdkaac."), None
try:
wav_tmp.unlink()
except Exception:
pass
return
# Run fdkaac to produce m4a
fdkaac_cmd = [
"./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000",
"-o", str(aac_out), str(wav_tmp)
]
yield preview_html(5.0, "Encoding AAC with fdkaac..."), None
stdout, stderr, rc = await run_command_capture(fdkaac_cmd)
try:
wav_tmp.unlink()
except Exception:
pass
if rc != 0:
yield preview_html(0.0, f"fdkaac failed: {stderr[:300] if stderr else 'no output'}"), None
return
yield preview_html(100.0, "AAC (fdkaac) audio ready.", media_src=str(aac_out)), str(aac_out)
return
# ----------------------
# FULL VIDEO FLOW
# ----------------------
# 1) Audio encode (via fdkaac or mp3)
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a"
if use_mp3:
ffmpeg_audio_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", "-c:a", "libmp3lame", "-b:a", "8k", str(out_audio)]
yield preview_html(2.0, "Encoding audio (MP3)..."), None
if total_seconds:
proc = await asyncio.create_subprocess_exec(*ffmpeg_audio_cmd, stderr=asyncio.subprocess.PIPE)
last = 0.0
while True:
line = await proc.stderr.readline()
if not line:
break
txt = line.decode(errors="ignore")
m = FFMPEG_TIME_RE.search(txt)
if m:
hh, mm, ss = m.groups()
current = int(hh) * 3600 + int(mm) * 60 + float(ss)
pct = (current / total_seconds) * 100.0 * 0.20
if pct - last >= 0.5:
last = pct
yield preview_html(pct, "Encoding audio (MP3)..."), None
await proc.wait()
if proc.returncode != 0:
yield preview_html(0.0, "ffmpeg audio encoding failed."), None
return
else:
stdout, stderr, rc = await run_command_capture(ffmpeg_audio_cmd)
if rc != 0:
yield preview_html(0.0, "ffmpeg audio encoding failed."), None
return
else:
# Use fdkaac path for AAC
if not FDKAAC_PATH.exists():
yield preview_html(0.0, "fdkaac not found at ./fdkaac — please add it to the app folder and make executable."), None
return
# 1a: Create WAV
yield preview_html(2.0, "Generating WAV for fdkaac..."), None
wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav"
ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)]
stdout, stderr, rc = await run_command_capture(ffmpeg_wav_cmd)
if rc != 0:
yield preview_html(0.0, "ffmpeg failed generating WAV for fdkaac."), None
try:
wav_tmp.unlink()
except Exception:
pass
return
# 1b: Run fdkaac
aac_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a"
fdkaac_cmd = [
"./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000",
"-o", str(aac_tmp), str(wav_tmp)
]
yield preview_html(6.0, "Encoding AAC with fdkaac..."), None
stdout, stderr, rc = await run_command_capture(fdkaac_cmd)
try:
wav_tmp.unlink()
except Exception:
pass
if rc != 0:
yield preview_html(0.0, f"fdkaac failed: {stderr[:300] if stderr else 'no output'}"), None
return
out_audio = aac_tmp
# 2) Video encode
step_video = "Encoding video track..."
yield preview_html(20.0, step_video), None
out_video = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4"
ffmpeg_video_cmd = ["ffmpeg", "-y", "-hwaccel", ACCEL, "-i", str(input_path)]
if downscale:
ffmpeg_video_cmd += ["-vf", "scale=-2:144"]
if custom_bitrate and video_bitrate:
ffmpeg_video_cmd += ["-b:v", f"{int(video_bitrate)}k"]
else:
ffmpeg_video_cmd += VIDEO_BASE_OPTS
if faster:
ffmpeg_video_cmd += ["-preset", "ultrafast"]
ffmpeg_video_cmd += ["-an", str(out_video)]
if total_seconds:
proc = await asyncio.create_subprocess_exec(*ffmpeg_video_cmd, stderr=asyncio.subprocess.PIPE)
last = 20.0
while True:
line = await proc.stderr.readline()
if not line:
break
txt = line.decode(errors="ignore")
m = FFMPEG_TIME_RE.search(txt)
if m:
hh, mm, ss = m.groups()
current = int(hh) * 3600 + int(mm) * 60 + float(ss)
pct_video = (current / total_seconds) * 100.0
combined = 20.0 + (pct_video * 0.70)
if combined - last >= 0.5:
last = combined
yield preview_html(combined, step_video), None
await proc.wait()
if proc.returncode != 0:
yield preview_html(0.0, "ffmpeg video encoding failed."), None
return
else:
stdout, stderr, rc = await run_command_capture(ffmpeg_video_cmd)
if rc != 0:
yield preview_html(0.0, "ffmpeg video encoding failed."), None
return
# 3) Merge audio + video
yield preview_html(90.0, "Merging audio & video..."), None
merged_out = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4"
merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c", "copy", str(merged_out)]
stdout, stderr, rc = await run_command_capture(merge_cmd)
if rc != 0:
# fallback re-encode audio into AAC during merge
merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c:v", "copy", "-c:a", "aac", str(merged_out)]
stdout, stderr, rc = await run_command_capture(merge_cmd)
if rc != 0:
yield preview_html(0.0, "Merging audio and video failed."), None
return
# cleanup intermediate audio/video
for p in (out_audio, out_video):
try:
p.unlink()
except Exception:
pass
# final
yield preview_html(100.0, "Conversion complete!", media_src=str(merged_out)), str(merged_out)
return
except Exception as e:
yield preview_html(0.0, f"Error: {e}"), None
return
finally:
# remove any temp files downloaded
for p in temp_files:
try:
p.unlink()
except Exception:
pass
# -------------------------
# Gradio UI
# -------------------------
with gr.Blocks(title="Low Quality Video Inator (fdkaac)") as demo:
gr.Markdown("# Low Quality Video Inator\nUpload a file or paste a YouTube URL. The app streams a step-aware overlayed progress bar while encoding.")
with gr.Row():
use_youtube = gr.Checkbox(label="Use YouTube URL (server will try to download first)", value=False)
youtube_url = gr.Textbox(label="YouTube URL", placeholder="https://youtube.com/...", lines=1)
video_file = gr.File(label="Upload Video File", file_types=list(ALLOWED_EXTENSIONS), file_count="single")
gr.Markdown("### Conversion Settings")
with gr.Row():
downscale = gr.Checkbox(label="Downscale to 144p", value=False)
faster = gr.Checkbox(label="Faster encoding (lower quality)", value=False)
with gr.Row():
use_mp3 = gr.Checkbox(label="Use MP3 audio", value=False)
audio_only = gr.Checkbox(label="Audio Only", value=False)
with gr.Row():
custom_bitrate = gr.Checkbox(label="Use custom video bitrate (kbps)", value=False)
video_bitrate = gr.Number(label="Video bitrate (kbps)", value=64, visible=False)
def toggle_bitrate(v):
return gr.update(visible=v)
custom_bitrate.change(toggle_bitrate, inputs=[custom_bitrate], outputs=[video_bitrate])
convert_btn = gr.Button("Convert Now", variant="primary")
preview_html_el = gr.HTML("<div>Ready. Preview will appear here.</div>", label="Preview")
download_file = gr.File(label="Download Result")
convert_btn.click(
fn=convert_stream,
inputs=[use_youtube, youtube_url, video_file, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate],
outputs=[preview_html_el, download_file]
)
demo.launch(share=False)