Spaces:
Running
Running
import os | |
import requests | |
import shutil | |
import subprocess | |
from pydub import AudioSegment | |
import whisper | |
from speechbrain.pretrained.interfaces import foreign_class | |
import torch | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
def clear_tmp_dir(path): | |
for filename in os.listdir(path): | |
file_path = os.path.join(path, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) | |
elif os.path.isdir(file_path): | |
shutil.rmtree(file_path) | |
except Exception as e: | |
print(f'Failed to delete {file_path}. Reason: {e}') | |
class AccentAnalyzerTool: | |
def __init__(self): | |
#self.whisper_model = whisper.load_model("tiny", device = device) | |
self.accent_model = foreign_class( | |
source="Jzuluaga/accent-id-commonaccent_xlsr-en-english", | |
pymodule_file="custom_interface.py", | |
classname="CustomEncoderWav2vec2Classifier" | |
) | |
self.accent_model.device = torch.device(device) | |
self.last_transcript = None | |
def log(self, msg): | |
print(f"[AccentAnalyzerTool] {msg}") | |
def analyze(self, url: str) -> str: | |
try: | |
self.log("Downloading video...") | |
tmp_dir = "tmp" | |
if not os.path.exists(tmp_dir): | |
os.makedirs(tmp_dir, exist_ok=True) | |
os.chmod(tmp_dir, 0o777) | |
else: | |
clear_tmp_dir(tmp_dir) | |
video_path = os.path.join(tmp_dir, "video.mp4") | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
"AppleWebKit/537.36 (KHTML, like Gecko) " | |
"Chrome/114.0.0.0 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp," | |
"image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", | |
"Accept-Language": "en-US,en;q=0.9", | |
"Referer": "https://www.youtube.com/", | |
"Connection": "keep-alive", | |
"DNT": "1", | |
} | |
r = requests.get(url, headers=headers, stream=True) | |
r.raise_for_status() | |
with open(video_path, "wb") as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
file_size = os.path.getsize(video_path) | |
self.log(f"Downloaded video size: {file_size} bytes") | |
if file_size < 1000: | |
raise ValueError("Downloaded video file is too small or invalid") | |
# Debug with ffprobe to check video validity | |
ffprobe_cmd = ["ffprobe", "-v", "error", "-show_format", "-show_streams", video_path] | |
try: | |
output = subprocess.check_output(ffprobe_cmd, stderr=subprocess.STDOUT).decode() | |
self.log(f"ffprobe output:\n{output}") | |
except subprocess.CalledProcessError as e: | |
self.log(f"ffprobe error:\n{e.output.decode()}") | |
self.log("Extracting audio...") | |
audio_path = os.path.join(tmp_dir, "audio.wav") | |
AudioSegment.from_file(video_path).export(audio_path, format="wav") | |
os.chmod(audio_path, 0o666) | |
self.log("Classifying accent...") | |
_, score, _, label = self.accent_model.classify_file(audio_path) | |
accent = label[0].upper() if label[0] == 'us' else label[0].capitalize() | |
confidence = round(float(score) * 100, 2) | |
self.log("Transcribing...") | |
whisper_model = whisper.load_model("tiny", device = device) | |
transcript = whisper_model.transcribe(audio_path)["text"] | |
self.last_transcript = transcript | |
summary = ( | |
f"The speaker has a **{accent} English accent** " | |
f"with **{confidence}% confidence**.\n\n" | |
f"**Transcript of the audio:**\n\n *{transcript.strip(' ')}*" | |
) | |
return summary | |
except Exception as e: | |
return f"Error analyzing accent: {str(e)}" | |