Spaces:
Running
Running
import streamlit as st | |
import os | |
import tempfile | |
import torch | |
import json | |
import urllib.request | |
from urllib.parse import urlparse | |
from moviepy import VideoFileClip, AudioFileClip | |
from speechbrain.pretrained.interfaces import foreign_class | |
import yt_dlp | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
model_dir = "/tmp/pretrained_models" | |
os.makedirs(model_dir, exist_ok=True) | |
# Load model once | |
classifier = foreign_class( | |
source="Jzuluaga/accent-id-commonaccent_xlsr-en-english", | |
pymodule_file="custom_interface.py", | |
classname="CustomEncoderWav2vec2Classifier", | |
savedir=model_dir | |
) | |
def extract_loom_id(url): | |
parsed_url = urlparse(url) | |
return parsed_url.path.split("/")[-1] | |
def download_loom_video(url, filename): | |
try: | |
video_id = extract_loom_id(url) | |
request = urllib.request.Request( | |
url=f"https://www.loom.com/api/campaigns/sessions/{video_id}/transcoded-url", | |
headers={}, | |
method="POST" | |
) | |
response = urllib.request.urlopen(request) | |
body = response.read() | |
content = json.loads(body.decode("utf-8")) | |
video_url = content["url"] | |
urllib.request.urlretrieve(video_url, filename) | |
return filename | |
except Exception as e: | |
raise RuntimeError(f"Failed to download video from Loom: {e}") | |
def download_youtube_audio(url): | |
try: | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'outtmpl': 'yt_audio.%(ext)s', | |
'quiet': True, | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '64', | |
}], | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
audioclip = AudioFileClip("yt_audio.mp3") | |
wav_path = "output.wav" | |
audioclip.write_audiofile(wav_path, logger=None) | |
audioclip.close() | |
os.remove("yt_audio.mp3") | |
return wav_path | |
except Exception as e: | |
raise RuntimeError(f"Failed to download from YouTube: {e}") | |
def download_direct_video(url): | |
try: | |
response = urllib.request.urlopen(url) | |
if response.status != 200: | |
raise RuntimeError("Failed to download video.") | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: | |
temp_file.write(response.read()) | |
return temp_file.name | |
except Exception as e: | |
raise RuntimeError(f"Failed to download video : {e}") | |
def extract_audio(video_path): | |
try: | |
clip = VideoFileClip(video_path) | |
# audio_clip = clip.audio.subclip(0, min(duration, clip.duration)) # ambil 10 detik awal atau durasi video kalau kurang | |
wav_path = video_path.replace(".mp4", ".wav") | |
clip.audio.write_audiofile(wav_path) | |
return wav_path | |
except Exception as e: | |
raise RuntimeError(f"Fail to extract the video : {e}") | |
def get_speech_segments(audio_path, min_silence_len=700, silence_thresh=-40, duration=10000): | |
""" | |
Get speech segments with absolute position | |
Detects non-silent parts in audio with precise timing | |
""" | |
audio = AudioSegment.from_wav(audio_path) | |
total_duration = len(audio) | |
nonsilent_ranges = detect_nonsilent( | |
audio, | |
min_silence_len=min_silence_len, | |
silence_thresh=silence_thresh | |
) | |
start_ms, original_end_ms = nonsilent_ranges[0] | |
end_ms = min(start_ms + duration, total_duration) | |
segment = audio[start_ms:end_ms] | |
temp_path = "temp_first_segment.wav" | |
segment.export(temp_path, format="wav") | |
return temp_path | |
def classify_audio(wav_path): | |
out_prob, score, index, label = classifier.classify_file(get_speech_segments(wav_path)) | |
confidence = float(score[0]) * 100 # convert tensor to float | |
return label, confidence | |
def delete_file(path): | |
try: | |
os.remove(path) | |
except: | |
pass | |
# Streamlit UI | |
st.title("Accent Classifier for English Speakers") | |
with st.form("Input your video (it can be video link or upload)"): | |
video_url = st.text_input( | |
"Enter video URL (YouTube, Loom, or .mp4)" | |
) | |
uploaded_file = st.file_uploader( | |
"Or upload a video file (mp4, mov, or mkv)", | |
type=["mp4", "mov", "avi"] | |
) | |
if st.form_submit_button("Process"): | |
video_path = None | |
wav_path = None | |
try: | |
with st.spinner('Processing video... Please wait'): | |
if video_url: | |
if "youtube.com" in video_url or "youtu.be" in video_url: | |
wav_path = download_youtube_audio(video_url) | |
elif "loom.com" in video_url: | |
video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
download_loom_video(video_url, video_path) | |
wav_path = extract_audio(video_path) | |
elif video_url.endswith(".mp4"): | |
video_path = download_direct_video(video_url) | |
wav_path = extract_audio(video_path) | |
else: | |
st.error("URL Format unrecognized.") | |
elif uploaded_file is not None: | |
video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
with open(video_path, "wb") as f: | |
f.write(uploaded_file.read()) | |
wav_path = extract_audio(video_path) | |
else: | |
st.error("Please upload a file or link") | |
if wav_path: | |
label, confidence = classify_audio(wav_path) | |
st.success(f"Video Accent: **{label}**") | |
st.info(f"Confidence Score: **{confidence:.2f}%**") | |
else: | |
st.error("Error processing video") | |
except Exception as e: | |
st.error(str(e)) | |
finally: | |
delete_file(wav_path) | |
delete_file(video_path) |