|
import numpy as np
|
|
|
|
from numpy.lib.stride_tricks import sliding_window_view
|
|
|
|
def istft(frames, framesize, hopsize):
|
|
frames = np.atleast_2d(frames)
|
|
assert frames.ndim == 2
|
|
|
|
analysis_window_size = np.ravel(framesize)[0]
|
|
synthesis_window_size = np.ravel(framesize)[-1]
|
|
|
|
assert analysis_window_size >= synthesis_window_size
|
|
|
|
A = asymmetric_analysis_window(analysis_window_size, synthesis_window_size) if analysis_window_size != synthesis_window_size else symmetric_window(analysis_window_size)
|
|
S = asymmetric_synthesis_window(analysis_window_size, synthesis_window_size) if analysis_window_size != synthesis_window_size else symmetric_window(synthesis_window_size)
|
|
|
|
W = S * hopsize / np.sum(A * S)
|
|
N = frames.shape[0] * hopsize + analysis_window_size
|
|
|
|
y = np.zeros((N), float)
|
|
|
|
frames[:, 0] = 0
|
|
frames[:, -1] = 0
|
|
frames0 = sliding_window_view(y, analysis_window_size, writeable=True)[::hopsize]
|
|
frames1 = np.fft.irfft(frames, axis=-1, norm='forward') * W
|
|
|
|
for i in range(min(len(frames0), len(frames1))):
|
|
frames0[i] += frames1[i]
|
|
|
|
return y
|
|
|
|
def asymmetric_synthesis_window(analysis_window_size, synthesis_window_size):
|
|
n = analysis_window_size
|
|
m = synthesis_window_size // 2
|
|
|
|
right = symmetric_window(2 * m)
|
|
window = np.zeros(n)
|
|
|
|
window[n-m-m:n-m] = np.square(right[:m]) / symmetric_window(2 * n - 2 * m)[n-m-m:n-m]
|
|
window[-m:] = right[-m:]
|
|
|
|
return window
|
|
|
|
def asymmetric_analysis_window(analysis_window_size, synthesis_window_size):
|
|
n = analysis_window_size
|
|
m = synthesis_window_size // 2
|
|
|
|
window = np.zeros(n)
|
|
window[:n-m] = symmetric_window(2 * n - 2 * m)[:n-m]
|
|
window[-m:] = symmetric_window(2 * m)[-m:]
|
|
|
|
return window
|
|
|
|
def symmetric_window(symmetric_window_size):
|
|
n = symmetric_window_size
|
|
window = 0.5 - 0.5 * np.cos(2 * np.pi * np.arange(n) / n)
|
|
|
|
return window
|
|
|
|
def stft(x, framesize, hopsize):
|
|
x = np.atleast_1d(x)
|
|
assert x.ndim == 1
|
|
|
|
analysis_window_size = np.ravel(framesize)[0]
|
|
synthesis_window_size = np.ravel(framesize)[-1]
|
|
|
|
assert analysis_window_size >= synthesis_window_size
|
|
|
|
W = asymmetric_analysis_window(analysis_window_size, synthesis_window_size) if analysis_window_size != synthesis_window_size else symmetric_window(analysis_window_size)
|
|
|
|
frames0 = sliding_window_view(x, analysis_window_size, writeable=False)[::hopsize]
|
|
frames1 = np.fft.rfft(frames0 * W, axis=-1, norm='forward')
|
|
|
|
return frames1
|
|
|
|
def normalize(frames, frames0):
|
|
for i in range(len(frames)):
|
|
a = np.real(frames0[i])
|
|
b = np.real(frames[i])
|
|
a = np.dot(a, a)
|
|
b = np.dot(b, b)
|
|
|
|
if b == 0: continue
|
|
frames[i] = np.real(frames[i]) * np.sqrt(a / b) + 1j * np.imag(frames[i])
|
|
|
|
return frames
|
|
|
|
def lowpass(cepstrum, quefrency):
|
|
cepstrum[1:quefrency] *= 2
|
|
cepstrum[quefrency+1:] = 0
|
|
|
|
return cepstrum
|
|
|
|
def lifter(frames, quefrency):
|
|
envelopes = np.zeros(frames.shape)
|
|
|
|
for i, frame in enumerate(frames):
|
|
with np.errstate(divide='ignore', invalid='ignore'):
|
|
spectrum = np.log10(np.real(frame))
|
|
|
|
envelopes[i] = np.power(10, np.real(np.fft.rfft(lowpass(np.fft.irfft(spectrum, norm='forward'), quefrency), norm='forward')))
|
|
|
|
return envelopes
|
|
|
|
def resample(x, factor):
|
|
if factor == 1: return x.copy()
|
|
y = np.zeros(x.shape, dtype=x.dtype)
|
|
|
|
n = len(x)
|
|
m = int(n * factor)
|
|
|
|
i = np.arange(min(n, m))
|
|
k = i * (n / m)
|
|
|
|
j = np.trunc(k).astype(int)
|
|
k = k - j
|
|
|
|
ok = (0 <= j) & (j < n - 1)
|
|
y[i[ok]] = k[ok] * x[j[ok] + 1] + (1 - k[ok]) * x[j[ok]]
|
|
|
|
return y
|
|
|
|
def shiftpitch(frames, factors, samplerate):
|
|
for i in range(len(frames)):
|
|
magnitudes = np.vstack([resample(np.real(frames[i]), factor) for factor in factors])
|
|
frequencies = np.vstack([resample(np.imag(frames[i]), factor) * factor for factor in factors])
|
|
|
|
magnitudes[(frequencies <= 0) | (frequencies >= samplerate / 2)] = 0
|
|
mask = np.argmax(magnitudes, axis=0)
|
|
|
|
magnitudes = np.take_along_axis(magnitudes, mask[None,:], axis=0)
|
|
frequencies = np.take_along_axis(frequencies, mask[None,:], axis=0)
|
|
|
|
frames[i] = magnitudes + 1j * frequencies
|
|
|
|
return frames
|
|
|
|
def wrap(x):
|
|
return (x + np.pi) % (2 * np.pi) - np.pi
|
|
|
|
def encode(frames, framesize, hopsize, samplerate):
|
|
M, N = frames.shape
|
|
analysis_framesize = np.ravel(framesize)[0]
|
|
|
|
freqinc = samplerate / analysis_framesize
|
|
phaseinc = 2 * np.pi * hopsize / analysis_framesize
|
|
|
|
buffer = np.zeros(N)
|
|
data = np.zeros((M, N), complex)
|
|
|
|
for m, frame in enumerate(frames):
|
|
arg = np.angle(frame)
|
|
delta = arg - buffer
|
|
|
|
buffer = arg
|
|
|
|
i = np.arange(N)
|
|
data[m] = np.abs(frame) + 1j * ((i + (wrap(delta - i * phaseinc) / phaseinc)) * freqinc)
|
|
|
|
return data
|
|
|
|
def decode(frames, framesize, hopsize, samplerate):
|
|
M, N = frames.shape
|
|
analysis_framesize = np.ravel(framesize)[0]
|
|
synthesis_framesize = np.ravel(framesize)[-1]
|
|
|
|
freqinc = samplerate / analysis_framesize
|
|
phaseinc = 2 * np.pi * hopsize / analysis_framesize
|
|
timeshift = 2 * np.pi * synthesis_framesize * np.arange(N) / N if synthesis_framesize != analysis_framesize else 0
|
|
|
|
buffer = np.zeros(N)
|
|
data = np.zeros((M, N), complex)
|
|
|
|
for m, frame in enumerate(frames):
|
|
i = np.arange(N)
|
|
delta = (i + ((np.imag(frame) - i * freqinc) / freqinc)) * phaseinc
|
|
buffer += delta
|
|
arg = buffer.copy()
|
|
arg -= timeshift
|
|
data[m] = np.real(frame) * np.exp(1j * arg)
|
|
|
|
return data
|
|
|
|
class StftPitchShift:
|
|
def __init__(self, framesize, hopsize, samplerate):
|
|
self.framesize = framesize
|
|
self.hopsize = hopsize
|
|
self.samplerate = samplerate
|
|
|
|
def shiftpitch(self, input, factors = 1, quefrency = 0, distortion = 1, normalization = False):
|
|
input = np.atleast_1d(input)
|
|
dtype = input.dtype
|
|
shape = input.shape
|
|
|
|
input = np.squeeze(input)
|
|
if input.ndim != 1: raise ValueError('input.ndim != 1')
|
|
|
|
if np.issubdtype(dtype, np.integer):
|
|
a, b = np.iinfo(dtype).min, np.iinfo(dtype).max
|
|
input = ((input.astype(float) - a) / (b - a)) * 2 - 1
|
|
elif not np.issubdtype(dtype, np.floating): raise TypeError('not np.issubdtype(dtype, np.floating)')
|
|
|
|
def isnotnormal(x):
|
|
return (np.isinf(x)) | (np.isnan(x)) | (abs(x) < np.finfo(x.dtype).tiny)
|
|
|
|
framesize = self.framesize
|
|
hopsize = self.hopsize
|
|
samplerate = self.samplerate
|
|
|
|
factors = np.asarray(factors).flatten()
|
|
quefrency = int(quefrency * samplerate)
|
|
|
|
frames = encode(stft(input, framesize, hopsize), framesize, hopsize, samplerate)
|
|
|
|
if normalization: frames0 = frames.copy()
|
|
|
|
if quefrency:
|
|
envelopes = lifter(frames, quefrency)
|
|
mask = isnotnormal(envelopes)
|
|
|
|
frames.real /= envelopes
|
|
frames.real[mask] = 0
|
|
|
|
if distortion != 1:
|
|
envelopes[mask] = 0
|
|
|
|
for i in range(len(envelopes)):
|
|
envelopes[i] = resample(envelopes[i], distortion)
|
|
|
|
mask = isnotnormal(envelopes)
|
|
|
|
frames = shiftpitch(frames, factors, samplerate)
|
|
frames.real *= envelopes
|
|
frames.real[mask] = 0
|
|
else: frames = shiftpitch(frames, factors, samplerate)
|
|
|
|
if normalization: frames = normalize(frames, frames0)
|
|
|
|
output = istft(decode(frames, framesize, hopsize, samplerate), framesize, hopsize)
|
|
output.resize(shape, refcheck=False)
|
|
|
|
if np.issubdtype(dtype, np.integer):
|
|
a, b = np.iinfo(dtype).min, np.iinfo(dtype).max
|
|
output = (((output + 1) / 2) * (b - a) + a).clip(a, b).astype(dtype)
|
|
elif output.dtype != dtype: output = output.astype(dtype)
|
|
|
|
assert output.dtype == dtype
|
|
assert output.shape == shape
|
|
|
|
return output |