File size: 3,581 Bytes
9556d07
 
 
 
 
c8c786a
9556d07
 
c8c786a
 
 
 
 
 
 
 
9556d07
 
 
 
c8c786a
 
 
9556d07
 
c8c786a
9556d07
 
 
 
 
 
ea2a9ca
 
 
 
 
9556d07
 
c8c786a
 
 
9556d07
 
 
c8c786a
 
 
 
 
 
 
9556d07
 
c8c786a
 
 
 
 
 
 
 
 
 
 
 
9556d07
c8c786a
 
 
 
 
 
 
 
 
 
 
324dee0
9556d07
 
 
 
 
 
c8c786a
9556d07
 
 
 
 
 
 
 
 
c8c786a
 
9556d07
 
c8c786a
9556d07
 
 
 
c8c786a
 
 
9556d07
c8c786a
 
9556d07
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import torch
import numpy as np
# import sounddevice as sd
import torch
import numpy as np
import datetime


def int2float(sound):
    abs_max = np.abs(sound).max()
    sound = sound.astype('float32')
    if abs_max > 0:
        sound *= 1/32768
    sound = sound.squeeze()  # depends on the use case
    return sound

class VoiceActivityController:
    def __init__(
            self, 
            sampling_rate = 16000,
            min_silence_to_final_ms = 500,
            min_speech_to_final_ms = 100,
            min_silence_duration_ms = 100,
            use_vad_result = True,
            activity_detected_callback=None,
            threshold =0.3
        ):
        self.activity_detected_callback=activity_detected_callback
        self.model, self.utils = torch.hub.load(
            repo_or_dir='snakers4/silero-vad',
            model='silero_vad'
        )
        # (self.get_speech_timestamps,
        # save_audio,
        # read_audio,
        # VADIterator,
        # collect_chunks) = self.utils

        self.sampling_rate = sampling_rate  
        self.final_silence_limit = min_silence_to_final_ms * self.sampling_rate / 1000 
        self.final_speech_limit = min_speech_to_final_ms *self.sampling_rate / 1000
        self.min_silence_samples = sampling_rate * min_silence_duration_ms / 1000

        self.use_vad_result = use_vad_result
        self.last_marked_chunk = None
        self.threshold = threshold
        self.reset_states()

    def reset_states(self):
        self.model.reset_states()
        self.temp_end = 0
        self.current_sample = 0

    def apply_vad(self, audio):
        x = int2float(audio)
        if not torch.is_tensor(x):
            try:
                x = torch.Tensor(x)
            except:
                raise TypeError("Audio cannot be casted to tensor. Cast it manually")

        speech_prob = self.model(x, self.sampling_rate).item()
        
        window_size_samples = len(x[0]) if x.dim() == 2 else len(x)
        self.current_sample += window_size_samples 


        if (speech_prob >= self.threshold):
            self.temp_end = 0
            return audio, window_size_samples, 0

        else :
            if not self.temp_end:
                self.temp_end = self.current_sample

            if self.current_sample - self.temp_end < self.min_silence_samples:
                return audio, 0, window_size_samples
            else:
                return np.array([], dtype=np.float16) if self.use_vad_result else audio, 0, window_size_samples





    def detect_user_speech(self, audio_stream, audio_in_int16 = False):
        last_silence_len= 0
        speech_len = 0

        for data in audio_stream:  # replace with your condition of choice
            
            
            audio_block = np.frombuffer(data, dtype=np.int16) if not audio_in_int16 else data
            wav = audio_block
            
            is_final = False
            voice_audio, speech_in_wav, last_silent_in_wav = self.apply_vad(wav)


            if speech_in_wav > 0 :
                last_silence_len= 0                
                speech_len += speech_in_wav
                if self.activity_detected_callback is not None:
                    self.activity_detected_callback()

            last_silence_len +=  last_silent_in_wav
            if last_silence_len>= self.final_silence_limit and speech_len >= self.final_speech_limit:

                is_final = True
                last_silence_len= 0
                speech_len = 0                

            yield voice_audio.tobytes(), is_final