File size: 3,999 Bytes
ffd2caa
 
 
 
5a8c370
 
 
 
14a17dc
 
 
 
 
 
 
 
 
 
 
5a8c370
 
4cbb775
5a8c370
 
 
 
 
ffd2caa
5a8c370
 
 
 
 
 
 
 
14a17dc
 
40b32ea
14a17dc
 
ffd2caa
5a8c370
ffd2caa
66e2d43
 
 
 
 
 
 
 
 
 
 
 
ffd2caa
 
5a8c370
ffd2caa
 
 
 
66e2d43
ffd2caa
 
 
 
 
 
 
 
 
 
 
 
5a8c370
 
 
 
40b32ea
5a8c370
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ffd2caa
5a8c370
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
import requests
import shutil
import subprocess
from pydub import AudioSegment
import whisper
from speechbrain.pretrained.interfaces import foreign_class

def clear_tmp_dir(path):
    for filename in os.listdir(path):
        file_path = os.path.join(path, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print(f'Failed to delete {file_path}. Reason: {e}')

class AccentAnalyzerTool:
    def __init__(self):
        self.whisper_model = whisper.load_model("base")
        self.accent_model = foreign_class(
            source="Jzuluaga/accent-id-commonaccent_xlsr-en-english",
            pymodule_file="custom_interface.py",
            classname="CustomEncoderWav2vec2Classifier"
        )
        self.last_transcript = None

    def log(self, msg):
        print(f"[AccentAnalyzerTool] {msg}")

    def analyze(self, url: str) -> str:
        try:
            self.log("Downloading video...")
            tmp_dir = "tmp"
            if not os.path.exists(tmp_dir):
                os.makedirs(tmp_dir, exist_ok=True)
                os.chmod(tmp_dir, 0o777)
            else:
                clear_tmp_dir(tmp_dir)

            video_path = os.path.join(tmp_dir, "video.mp4")

            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                              "AppleWebKit/537.36 (KHTML, like Gecko) "
                              "Chrome/114.0.0.0 Safari/537.36",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,"
                          "image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
                "Accept-Language": "en-US,en;q=0.9",
                "Referer": "https://www.youtube.com/",
                "Connection": "keep-alive",
                "DNT": "1",
            }
            
            r = requests.get(url, headers=headers, stream=True)
            r.raise_for_status()
            with open(video_path, "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)


            file_size = os.path.getsize(video_path)
            self.log(f"Downloaded video size: {file_size} bytes")
            if file_size < 1000:
                raise ValueError("Downloaded video file is too small or invalid")

            # Debug with ffprobe to check video validity
            ffprobe_cmd = ["ffprobe", "-v", "error", "-show_format", "-show_streams", video_path]
            try:
                output = subprocess.check_output(ffprobe_cmd, stderr=subprocess.STDOUT).decode()
                self.log(f"ffprobe output:\n{output}")
            except subprocess.CalledProcessError as e:
                self.log(f"ffprobe error:\n{e.output.decode()}")

            self.log("Extracting audio...")
            audio_path = os.path.join(tmp_dir, "audio.wav")
            AudioSegment.from_file(video_path).export(audio_path, format="wav")
            os.chmod(audio_path, 0o666)  

            self.log("Classifying accent...")
            _, score, _, label = self.accent_model.classify_file(audio_path)
            accent = label[0].upper() if label[0] == 'us' else label[0].capitalize()
            confidence = round(float(score) * 100, 2)

            self.log("Transcribing...")
            transcript = self.whisper_model.transcribe(audio_path)["text"]
            self.last_transcript = transcript

            summary = (
                f"The speaker has a **{accent} English accent** "
                f"with **{confidence}% confidence**.\n\n"
                f"**Transcript of the audio:**\n\n *{transcript.strip(' ')}*"
            )

            return summary

        except Exception as e:
            return f"Error analyzing accent: {str(e)}"