File size: 3,877 Bytes
9556d07
 
 
 
 
c8c786a
9556d07
 
c8c786a
 
 
 
 
 
 
 
9556d07
 
 
 
c8c786a
 
 
9556d07
 
c8c786a
9556d07
 
 
 
 
 
ea2a9ca
 
 
 
 
9556d07
 
c8c786a
 
 
9556d07
 
 
c8c786a
 
 
 
 
 
 
9556d07
d543411
 
 
9556d07
d543411
 
c8c786a
 
 
 
 
 
 
 
 
 
 
9556d07
c8c786a
 
 
 
 
 
 
 
 
 
 
324dee0
9556d07
 
d543411
 
 
 
9556d07
d543411
 
 
 
9556d07
 
d543411
 
 
 
 
9556d07
d543411
 
c8c786a
d543411
 
 
9556d07
d543411
 
9556d07
c8c786a
9556d07
d543411
 
 
9556d07
d543411
 
 
9556d07
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import torch
import numpy as np
# import sounddevice as sd
import torch
import numpy as np
import datetime


def int2float(sound):
    abs_max = np.abs(sound).max()
    sound = sound.astype('float32')
    if abs_max > 0:
        sound *= 1/32768
    sound = sound.squeeze()  # depends on the use case
    return sound

class VoiceActivityController:
    def __init__(
            self, 
            sampling_rate = 16000,
            min_silence_to_final_ms = 500,
            min_speech_to_final_ms = 100,
            min_silence_duration_ms = 100,
            use_vad_result = True,
            activity_detected_callback=None,
            threshold =0.3
        ):
        self.activity_detected_callback=activity_detected_callback
        self.model, self.utils = torch.hub.load(
            repo_or_dir='snakers4/silero-vad',
            model='silero_vad'
        )
        # (self.get_speech_timestamps,
        # save_audio,
        # read_audio,
        # VADIterator,
        # collect_chunks) = self.utils

        self.sampling_rate = sampling_rate  
        self.final_silence_limit = min_silence_to_final_ms * self.sampling_rate / 1000 
        self.final_speech_limit = min_speech_to_final_ms *self.sampling_rate / 1000
        self.min_silence_samples = sampling_rate * min_silence_duration_ms / 1000

        self.use_vad_result = use_vad_result
        self.last_marked_chunk = None
        self.threshold = threshold
        self.reset_states()

    def reset_states(self):
        self.model.reset_states()
        self.temp_end = 0
        self.current_sample = 0

        self.last_silence_len= 0
        self.speech_len = 0

    def apply_vad(self, audio):
#        x = int2float(audio)
        x = audio
        if not torch.is_tensor(x):
            try:
                x = torch.Tensor(x)
            except:
                raise TypeError("Audio cannot be casted to tensor. Cast it manually")

        speech_prob = self.model(x, self.sampling_rate).item()
        
        window_size_samples = len(x[0]) if x.dim() == 2 else len(x)
        self.current_sample += window_size_samples 


        if (speech_prob >= self.threshold):
            self.temp_end = 0
            return audio, window_size_samples, 0

        else :
            if not self.temp_end:
                self.temp_end = self.current_sample

            if self.current_sample - self.temp_end < self.min_silence_samples:
                return audio, 0, window_size_samples
            else:
                return np.array([], dtype=np.float16) if self.use_vad_result else audio, 0, window_size_samples


    def detect_speech_iter(self, data, audio_in_int16 = False):
#        audio_block = np.frombuffer(data, dtype=np.int16) if not audio_in_int16 else data
        audio_block = data
        wav = audio_block

        print(wav, len(wav), type(wav), wav.dtype)
        
        is_final = False
        voice_audio, speech_in_wav, last_silent_in_wav = self.apply_vad(wav)


        if speech_in_wav > 0 :
            self.last_silence_len= 0                
            self.speech_len += speech_in_wav
#            if self.activity_detected_callback is not None:
#                self.activity_detected_callback()

        self.last_silence_len +=  last_silent_in_wav
        if self.last_silence_len>= self.final_silence_limit and self.speech_len >= self.final_speech_limit:

            is_final = True
            self.last_silence_len= 0
            self.speech_len = 0                

#        return voice_audio.tobytes(), is_final
        return voice_audio, is_final



    def detect_user_speech(self, audio_stream, audio_in_int16 = False):
        self.last_silence_len= 0
        self.speech_len = 0

        for data in audio_stream:  # replace with your condition of choice
            yield self.detect_speech_iter(data, audio_in_int16)