Testys commited on
Commit
09a85a1
·
verified ·
1 Parent(s): 95b307f

Create streamer.py

Browse files
Files changed (1) hide show
  1. streamer.py +137 -0
streamer.py ADDED
@@ -0,0 +1,137 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from queue import Queue
2
+ from transformers.generation.streamers import BaseStreamer
3
+ from typing import Optional
4
+ from parler_tts import ParlerTTSForConditionalGeneration
5
+ import numpy as np
6
+ import math
7
+ import torch
8
+
9
+
10
+ class ParlerTTSStreamer(BaseStreamer):
11
+ def __init__(
12
+ self,
13
+ model: ParlerTTSForConditionalGeneration,
14
+ device: Optional[str] = None,
15
+ play_steps: Optional[int] = 10,
16
+ stride: Optional[int] = None,
17
+ timeout: Optional[float] = None,
18
+ ):
19
+ """
20
+ Streamer that stores playback-ready audio in a queue, to be used by a downstream application as an iterator. This is
21
+ useful for applications that benefit from accessing the generated audio in a non-blocking way (e.g. in an interactive
22
+ Gradio demo).
23
+ Parameters:
24
+ model (`ParlerTTSForConditionalGeneration`):
25
+ The Parler-TTS model used to generate the audio waveform.
26
+ device (`str`, *optional*):
27
+ The torch device on which to run the computation. If `None`, will default to the device of the model.
28
+ play_steps (`int`, *optional*, defaults to 10):
29
+ The number of generation steps with which to return the generated audio array. Using fewer steps will
30
+ mean the first chunk is ready faster, but will require more codec decoding steps overall. This value
31
+ should be tuned to your device and latency requirements.
32
+ stride (`int`, *optional*):
33
+ The window (stride) between adjacent audio samples. Using a stride between adjacent audio samples reduces
34
+ the hard boundary between them, giving smoother playback. If `None`, will default to a value equivalent to
35
+ play_steps // 6 in the audio space.
36
+ timeout (`int`, *optional*):
37
+ The timeout for the audio queue. If `None`, the queue will block indefinitely. Useful to handle exceptions
38
+ in `.generate()`, when it is called in a separate thread.
39
+ """
40
+ self.decoder = model.decoder
41
+ self.audio_encoder = model.audio_encoder
42
+ self.generation_config = model.generation_config
43
+ self.device = device if device is not None else model.device
44
+
45
+ # variables used in the streaming process
46
+ self.play_steps = play_steps
47
+ if stride is not None:
48
+ self.stride = stride
49
+ else:
50
+ hop_length = math.floor(self.audio_encoder.config.sampling_rate / self.audio_encoder.config.frame_rate)
51
+ self.stride = hop_length * (play_steps - self.decoder.num_codebooks) // 6
52
+ self.token_cache = None
53
+ self.to_yield = 0
54
+
55
+ # varibles used in the thread process
56
+ self.audio_queue = Queue()
57
+ self.stop_signal = None
58
+ self.timeout = timeout
59
+
60
+ def apply_delay_pattern_mask(self, input_ids):
61
+ # build the delay pattern mask for offsetting each codebook prediction by 1 (this behaviour is specific to Parler)
62
+ _, delay_pattern_mask = self.decoder.build_delay_pattern_mask(
63
+ input_ids[:, :1],
64
+ bos_token_id=self.generation_config.bos_token_id,
65
+ pad_token_id=self.generation_config.decoder_start_token_id,
66
+ max_length=input_ids.shape[-1],
67
+ )
68
+ # apply the pattern mask to the input ids
69
+ input_ids = self.decoder.apply_delay_pattern_mask(input_ids, delay_pattern_mask)
70
+
71
+ # revert the pattern delay mask by filtering the pad token id
72
+ mask = (delay_pattern_mask != self.generation_config.bos_token_id) & (delay_pattern_mask != self.generation_config.pad_token_id)
73
+ input_ids = input_ids[mask].reshape(1, self.decoder.num_codebooks, -1)
74
+ # append the frame dimension back to the audio codes
75
+ input_ids = input_ids[None, ...]
76
+
77
+ # send the input_ids to the correct device
78
+ input_ids = input_ids.to(self.audio_encoder.device)
79
+
80
+ decode_sequentially = (
81
+ self.generation_config.bos_token_id in input_ids
82
+ or self.generation_config.pad_token_id in input_ids
83
+ or self.generation_config.eos_token_id in input_ids
84
+ )
85
+ if not decode_sequentially:
86
+ output_values = self.audio_encoder.decode(
87
+ input_ids,
88
+ audio_scales=[None],
89
+ )
90
+ else:
91
+ sample = input_ids[:, 0]
92
+ sample_mask = (sample >= self.audio_encoder.config.codebook_size).sum(dim=(0, 1)) == 0
93
+ sample = sample[:, :, sample_mask]
94
+ output_values = self.audio_encoder.decode(sample[None, ...], [None])
95
+
96
+ audio_values = output_values.audio_values[0, 0]
97
+ return audio_values.cpu().float().numpy()
98
+
99
+ def put(self, value):
100
+ batch_size = value.shape[0] // self.decoder.num_codebooks
101
+ if batch_size > 1:
102
+ raise ValueError("ParlerTTSStreamer only supports batch size 1")
103
+
104
+ if self.token_cache is None:
105
+ self.token_cache = value
106
+ else:
107
+ self.token_cache = torch.concatenate([self.token_cache, value[:, None]], dim=-1)
108
+
109
+ if self.token_cache.shape[-1] % self.play_steps == 0:
110
+ audio_values = self.apply_delay_pattern_mask(self.token_cache)
111
+ self.on_finalized_audio(audio_values[self.to_yield : -self.stride])
112
+ self.to_yield += len(audio_values) - self.to_yield - self.stride
113
+
114
+ def end(self):
115
+ """Flushes any remaining cache and appends the stop symbol."""
116
+ if self.token_cache is not None:
117
+ audio_values = self.apply_delay_pattern_mask(self.token_cache)
118
+ else:
119
+ audio_values = np.zeros(self.to_yield)
120
+
121
+ self.on_finalized_audio(audio_values[self.to_yield :], stream_end=True)
122
+
123
+ def on_finalized_audio(self, audio: np.ndarray, stream_end: bool = False):
124
+ """Put the new audio in the queue. If the stream is ending, also put a stop signal in the queue."""
125
+ self.audio_queue.put(audio, timeout=self.timeout)
126
+ if stream_end:
127
+ self.audio_queue.put(self.stop_signal, timeout=self.timeout)
128
+
129
+ def __iter__(self):
130
+ return self
131
+
132
+ def __next__(self):
133
+ value = self.audio_queue.get(timeout=self.timeout)
134
+ if not isinstance(value, np.ndarray) and value == self.stop_signal:
135
+ raise StopIteration()
136
+ else:
137
+ return value