Spaces:
Configuration error
Configuration error
| #!/usr/bin/env python | |
| # -*- encoding: utf-8 -*- | |
| """ | |
| Sequential modeling | |
| =================== | |
| Sequence alignment | |
| ------------------ | |
| .. autosummary:: | |
| :toctree: generated/ | |
| dtw | |
| rqa | |
| Viterbi decoding | |
| ---------------- | |
| .. autosummary:: | |
| :toctree: generated/ | |
| viterbi | |
| viterbi_discriminative | |
| viterbi_binary | |
| Transition matrices | |
| ------------------- | |
| .. autosummary:: | |
| :toctree: generated/ | |
| transition_uniform | |
| transition_loop | |
| transition_cycle | |
| transition_local | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| from scipy.spatial.distance import cdist | |
| from numba import jit | |
| from .util import pad_center, fill_off_diagonal, is_positive_int, tiny, expand_to | |
| from .util.exceptions import ParameterError | |
| from .filters import get_window | |
| from typing import Any, Iterable, List, Optional, Tuple, Union, overload | |
| from typing_extensions import Literal | |
| from ._typing import _WindowSpec, _IntLike_co | |
| __all__ = [ | |
| "dtw", | |
| "dtw_backtracking", | |
| "rqa", | |
| "viterbi", | |
| "viterbi_discriminative", | |
| "viterbi_binary", | |
| "transition_uniform", | |
| "transition_loop", | |
| "transition_cycle", | |
| "transition_local", | |
| ] | |
| def dtw( | |
| X: np.ndarray, | |
| Y: np.ndarray, | |
| *, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[False], | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[False] = ..., | |
| ) -> np.ndarray: | |
| ... | |
| def dtw( | |
| *, | |
| C: np.ndarray, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[False], | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[False] = ..., | |
| ) -> np.ndarray: | |
| ... | |
| def dtw( | |
| X: np.ndarray, | |
| Y: np.ndarray, | |
| *, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[False], | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[True], | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def dtw( | |
| *, | |
| C: np.ndarray, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[False], | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[True], | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def dtw( | |
| X: np.ndarray, | |
| Y: np.ndarray, | |
| *, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[True] = ..., | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[False] = ..., | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def dtw( | |
| *, | |
| C: np.ndarray, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[True] = ..., | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[False] = ..., | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def dtw( | |
| X: np.ndarray, | |
| Y: np.ndarray, | |
| *, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[True] = ..., | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[True], | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| ... | |
| def dtw( | |
| *, | |
| C: np.ndarray, | |
| metric: str = ..., | |
| step_sizes_sigma: Optional[np.ndarray] = ..., | |
| weights_add: Optional[np.ndarray] = ..., | |
| weights_mul: Optional[np.ndarray] = ..., | |
| subseq: bool = ..., | |
| backtrack: Literal[True] = ..., | |
| global_constraints: bool = ..., | |
| band_rad: float = ..., | |
| return_steps: Literal[True], | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| ... | |
| def dtw( | |
| X: Optional[np.ndarray] = None, | |
| Y: Optional[np.ndarray] = None, | |
| *, | |
| C: Optional[np.ndarray] = None, | |
| metric: str = "euclidean", | |
| step_sizes_sigma: Optional[np.ndarray] = None, | |
| weights_add: Optional[np.ndarray] = None, | |
| weights_mul: Optional[np.ndarray] = None, | |
| subseq: bool = False, | |
| backtrack: bool = True, | |
| global_constraints: bool = False, | |
| band_rad: float = 0.25, | |
| return_steps: bool = False, | |
| ) -> Union[ | |
| np.ndarray, Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray] | |
| ]: | |
| """Dynamic time warping (DTW). | |
| This function performs a DTW and path backtracking on two sequences. | |
| We follow the nomenclature and algorithmic approach as described in [#]_. | |
| .. [#] Meinard Mueller | |
| Fundamentals of Music Processing — Audio, Analysis, Algorithms, Applications | |
| Springer Verlag, ISBN: 978-3-319-21944-8, 2015. | |
| Parameters | |
| ---------- | |
| X : np.ndarray [shape=(..., K, N)] | |
| audio feature matrix (e.g., chroma features) | |
| If ``X`` has more than two dimensions (e.g., for multi-channel inputs), all leading | |
| dimensions are used when computing distance to ``Y``. | |
| Y : np.ndarray [shape=(..., K, M)] | |
| audio feature matrix (e.g., chroma features) | |
| C : np.ndarray [shape=(N, M)] | |
| Precomputed distance matrix. If supplied, X and Y must not be supplied and | |
| ``metric`` will be ignored. | |
| metric : str | |
| Identifier for the cost-function as documented | |
| in `scipy.spatial.distance.cdist()` | |
| step_sizes_sigma : np.ndarray [shape=[n, 2]] | |
| Specifies allowed step sizes as used by the dtw. | |
| weights_add : np.ndarray [shape=[n, ]] | |
| Additive weights to penalize certain step sizes. | |
| weights_mul : np.ndarray [shape=[n, ]] | |
| Multiplicative weights to penalize certain step sizes. | |
| subseq : bool | |
| Enable subsequence DTW, e.g., for retrieval tasks. | |
| backtrack : bool | |
| Enable backtracking in accumulated cost matrix. | |
| global_constraints : bool | |
| Applies global constraints to the cost matrix ``C`` (Sakoe-Chiba band). | |
| band_rad : float | |
| The Sakoe-Chiba band radius (1/2 of the width) will be | |
| ``int(radius*min(C.shape))``. | |
| return_steps : bool | |
| If true, the function returns ``steps``, the step matrix, containing | |
| the indices of the used steps from the cost accumulation step. | |
| Returns | |
| ------- | |
| D : np.ndarray [shape=(N, M)] | |
| accumulated cost matrix. | |
| D[N, M] is the total alignment cost. | |
| When doing subsequence DTW, D[N,:] indicates a matching function. | |
| wp : np.ndarray [shape=(N, 2)] | |
| Warping path with index pairs. | |
| Each row of the array contains an index pair (n, m). | |
| Only returned when ``backtrack`` is True. | |
| steps : np.ndarray [shape=(N, M)] | |
| Step matrix, containing the indices of the used steps from the cost | |
| accumulation step. | |
| Only returned when ``return_steps`` is True. | |
| Raises | |
| ------ | |
| ParameterError | |
| If you are doing diagonal matching and Y is shorter than X or if an | |
| incompatible combination of X, Y, and C are supplied. | |
| If your input dimensions are incompatible. | |
| If the cost matrix has NaN values. | |
| Examples | |
| -------- | |
| >>> import numpy as np | |
| >>> import matplotlib.pyplot as plt | |
| >>> y, sr = librosa.load(librosa.ex('brahms'), offset=10, duration=15) | |
| >>> X = librosa.feature.chroma_cens(y=y, sr=sr) | |
| >>> noise = np.random.rand(X.shape[0], 200) | |
| >>> Y = np.concatenate((noise, noise, X, noise), axis=1) | |
| >>> D, wp = librosa.sequence.dtw(X, Y, subseq=True) | |
| >>> fig, ax = plt.subplots(nrows=2, sharex=True) | |
| >>> img = librosa.display.specshow(D, x_axis='frames', y_axis='frames', | |
| ... ax=ax[0]) | |
| >>> ax[0].set(title='DTW cost', xlabel='Noisy sequence', ylabel='Target') | |
| >>> ax[0].plot(wp[:, 1], wp[:, 0], label='Optimal path', color='y') | |
| >>> ax[0].legend() | |
| >>> fig.colorbar(img, ax=ax[0]) | |
| >>> ax[1].plot(D[-1, :] / wp.shape[0]) | |
| >>> ax[1].set(xlim=[0, Y.shape[1]], ylim=[0, 2], | |
| ... title='Matching cost function') | |
| """ | |
| # Default Parameters | |
| default_steps = np.array([[1, 1], [0, 1], [1, 0]], dtype=np.uint32) | |
| default_weights_add = np.zeros(3, dtype=np.float64) | |
| default_weights_mul = np.ones(3, dtype=np.float64) | |
| if step_sizes_sigma is None: | |
| # Use the default steps | |
| step_sizes_sigma = default_steps | |
| # Use default weights if none are provided | |
| if weights_add is None: | |
| weights_add = default_weights_add | |
| if weights_mul is None: | |
| weights_mul = default_weights_mul | |
| else: | |
| # If we have custom steps but no weights, construct them here | |
| if weights_add is None: | |
| weights_add = np.zeros(len(step_sizes_sigma), dtype=np.float64) | |
| if weights_mul is None: | |
| weights_mul = np.ones(len(step_sizes_sigma), dtype=np.float64) | |
| # Make the default step weights infinite so that they are never | |
| # preferred over custom steps | |
| default_weights_add.fill(np.inf) | |
| default_weights_mul.fill(np.inf) | |
| # Append custom steps and weights to our defaults | |
| step_sizes_sigma = np.concatenate((default_steps, step_sizes_sigma)) | |
| weights_add = np.concatenate((default_weights_add, weights_add)) | |
| weights_mul = np.concatenate((default_weights_mul, weights_mul)) | |
| # These asserts are bad, but mypy cannot trace the code paths properly | |
| assert step_sizes_sigma is not None | |
| assert weights_add is not None | |
| assert weights_mul is not None | |
| if np.any(step_sizes_sigma < 0): | |
| raise ParameterError("step_sizes_sigma cannot contain negative values") | |
| if len(step_sizes_sigma) != len(weights_add): | |
| raise ParameterError("len(weights_add) must be equal to len(step_sizes_sigma)") | |
| if len(step_sizes_sigma) != len(weights_mul): | |
| raise ParameterError("len(weights_mul) must be equal to len(step_sizes_sigma)") | |
| if C is None and (X is None or Y is None): | |
| raise ParameterError("If C is not supplied, both X and Y must be supplied") | |
| if C is not None and (X is not None or Y is not None): | |
| raise ParameterError("If C is supplied, both X and Y must not be supplied") | |
| c_is_transposed = False | |
| # calculate pair-wise distances, unless already supplied. | |
| # C_local will keep track of whether the distance matrix was supplied | |
| # by the user (False) or constructed locally (True) | |
| C_local = False | |
| if C is None: | |
| C_local = True | |
| # mypy can't figure out that this case does not happen | |
| assert X is not None and Y is not None | |
| # take care of dimensions | |
| X = np.atleast_2d(X) | |
| Y = np.atleast_2d(Y) | |
| # Perform some shape-squashing here | |
| # Put the time axes around front | |
| # Suppress types because mypy doesn't know these are ndarrays | |
| X = np.swapaxes(X, -1, 0) # type: ignore | |
| Y = np.swapaxes(Y, -1, 0) # type: ignore | |
| # Flatten the remaining dimensions | |
| # Use F-ordering to preserve columns | |
| X = X.reshape((X.shape[0], -1), order="F") | |
| Y = Y.reshape((Y.shape[0], -1), order="F") | |
| try: | |
| C = cdist(X, Y, metric=metric) | |
| except ValueError as exc: | |
| raise ParameterError( | |
| "scipy.spatial.distance.cdist returned an error.\n" | |
| "Please provide your input in the form X.shape=(K, N) " | |
| "and Y.shape=(K, M).\n 1-dimensional sequences should " | |
| "be reshaped to X.shape=(1, N) and Y.shape=(1, M)." | |
| ) from exc | |
| # for subsequence matching: | |
| # if N > M, Y can be a subsequence of X | |
| if subseq and (X.shape[0] > Y.shape[0]): | |
| C = C.T | |
| c_is_transposed = True | |
| C = np.atleast_2d(C) | |
| # if diagonal matching, Y has to be longer than X | |
| # (X simply cannot be contained in Y) | |
| if np.array_equal(step_sizes_sigma, np.array([[1, 1]])) and ( | |
| C.shape[0] > C.shape[1] | |
| ): | |
| raise ParameterError( | |
| "For diagonal matching: Y.shape[-1] >= X.shape[-11] " | |
| "(C.shape[1] >= C.shape[0])" | |
| ) | |
| max_0 = step_sizes_sigma[:, 0].max() | |
| max_1 = step_sizes_sigma[:, 1].max() | |
| # check C here for nans before building global constraints | |
| if np.any(np.isnan(C)): | |
| raise ParameterError("DTW cost matrix C has NaN values. ") | |
| if global_constraints: | |
| # Apply global constraints to the cost matrix | |
| if not C_local: | |
| # If C was provided as input, make a copy here | |
| C = np.copy(C) | |
| fill_off_diagonal(C, radius=band_rad, value=np.inf) | |
| # initialize whole matrix with infinity values | |
| D = np.ones(C.shape + np.array([max_0, max_1])) * np.inf | |
| # set starting point to C[0, 0] | |
| D[max_0, max_1] = C[0, 0] | |
| if subseq: | |
| D[max_0, max_1:] = C[0, :] | |
| # initialize step matrix with -1 | |
| # will be filled in calc_accu_cost() with indices from step_sizes_sigma | |
| steps = np.zeros(D.shape, dtype=np.int32) | |
| # these steps correspond to left- (first row) and up-(first column) moves | |
| steps[0, :] = 1 | |
| steps[:, 0] = 2 | |
| # calculate accumulated cost matrix | |
| D: np.ndarray | |
| steps: np.ndarray | |
| D, steps = __dtw_calc_accu_cost( | |
| C, D, steps, step_sizes_sigma, weights_mul, weights_add, max_0, max_1 | |
| ) | |
| # delete infinity rows and columns | |
| D = D[max_0:, max_1:] | |
| steps = steps[max_0:, max_1:] | |
| return_values: List[np.ndarray] | |
| if backtrack: | |
| wp: np.ndarray | |
| if subseq: | |
| if np.all(np.isinf(D[-1])): | |
| raise ParameterError( | |
| "No valid sub-sequence warping path could " | |
| "be constructed with the given step sizes." | |
| ) | |
| start = np.argmin(D[-1, :]) | |
| _wp = __dtw_backtracking(steps, step_sizes_sigma, subseq, start) | |
| else: | |
| # perform warping path backtracking | |
| if np.isinf(D[-1, -1]): | |
| raise ParameterError( | |
| "No valid sub-sequence warping path could " | |
| "be constructed with the given step sizes." | |
| ) | |
| _wp = __dtw_backtracking(steps, step_sizes_sigma, subseq) | |
| if _wp[-1] != (0, 0): | |
| raise ParameterError( | |
| "Unable to compute a full DTW warping path. " | |
| "You may want to try again with subseq=True." | |
| ) | |
| wp = np.asarray(_wp, dtype=int) | |
| # since we transposed in the beginning, we have to adjust the index pairs back | |
| if subseq and ( | |
| (X is not None and Y is not None and X.shape[0] > Y.shape[0]) | |
| or c_is_transposed | |
| or C.shape[0] > C.shape[1] | |
| ): | |
| wp = np.fliplr(wp) | |
| return_values = [D, wp] | |
| else: | |
| return_values = [D] | |
| if return_steps: | |
| return_values.append(steps) | |
| if len(return_values) > 1: | |
| # Suppressing type check here because mypy can't | |
| # infer the exact length of the tuple | |
| return tuple(return_values) # type: ignore | |
| else: | |
| return return_values[0] | |
| # type: ignore | |
| def __dtw_calc_accu_cost( | |
| C: np.ndarray, | |
| D: np.ndarray, | |
| steps: np.ndarray, | |
| step_sizes_sigma: np.ndarray, | |
| weights_mul: np.ndarray, | |
| weights_add: np.ndarray, | |
| max_0: int, | |
| max_1: int, | |
| ) -> Tuple[np.ndarray, np.ndarray]: # pragma: no cover | |
| """Calculate the accumulated cost matrix D. | |
| Use dynamic programming to calculate the accumulated costs. | |
| Parameters | |
| ---------- | |
| C : np.ndarray [shape=(N, M)] | |
| pre-computed cost matrix | |
| D : np.ndarray [shape=(N, M)] | |
| accumulated cost matrix | |
| steps : np.ndarray [shape=(N, M)] | |
| Step matrix, containing the indices of the used steps from the cost | |
| accumulation step. | |
| step_sizes_sigma : np.ndarray [shape=[n, 2]] | |
| Specifies allowed step sizes as used by the dtw. | |
| weights_add : np.ndarray [shape=[n, ]] | |
| Additive weights to penalize certain step sizes. | |
| weights_mul : np.ndarray [shape=[n, ]] | |
| Multiplicative weights to penalize certain step sizes. | |
| max_0 : int | |
| maximum number of steps in step_sizes_sigma in dim 0. | |
| max_1 : int | |
| maximum number of steps in step_sizes_sigma in dim 1. | |
| Returns | |
| ------- | |
| D : np.ndarray [shape=(N, M)] | |
| accumulated cost matrix. | |
| D[N, M] is the total alignment cost. | |
| When doing subsequence DTW, D[N,:] indicates a matching function. | |
| steps : np.ndarray [shape=(N, M)] | |
| Step matrix, containing the indices of the used steps from the cost | |
| accumulation step. | |
| See Also | |
| -------- | |
| dtw | |
| """ | |
| for cur_n in range(max_0, D.shape[0]): | |
| for cur_m in range(max_1, D.shape[1]): | |
| # accumulate costs | |
| for cur_step_idx, cur_w_add, cur_w_mul in zip( | |
| range(step_sizes_sigma.shape[0]), weights_add, weights_mul | |
| ): | |
| cur_D = D[ | |
| cur_n - step_sizes_sigma[cur_step_idx, 0], | |
| cur_m - step_sizes_sigma[cur_step_idx, 1], | |
| ] | |
| cur_C = cur_w_mul * C[cur_n - max_0, cur_m - max_1] | |
| cur_C += cur_w_add | |
| cur_cost = cur_D + cur_C | |
| # check if cur_cost is smaller than the one stored in D | |
| if cur_cost < D[cur_n, cur_m]: | |
| D[cur_n, cur_m] = cur_cost | |
| # save step-index | |
| steps[cur_n, cur_m] = cur_step_idx | |
| return D, steps | |
| # type: ignore | |
| def __dtw_backtracking( | |
| steps: np.ndarray, | |
| step_sizes_sigma: np.ndarray, | |
| subseq: bool, | |
| start: Optional[int] = None, | |
| ) -> List[Tuple[int, int]]: # pragma: no cover | |
| """Backtrack optimal warping path. | |
| Uses the saved step sizes from the cost accumulation | |
| step to backtrack the index pairs for an optimal | |
| warping path. | |
| Parameters | |
| ---------- | |
| steps : np.ndarray [shape=(N, M)] | |
| Step matrix, containing the indices of the used steps from the cost | |
| accumulation step. | |
| step_sizes_sigma : np.ndarray [shape=[n, 2]] | |
| Specifies allowed step sizes as used by the dtw. | |
| subseq : bool | |
| Enable subsequence DTW, e.g., for retrieval tasks. | |
| start : int | |
| Start column index for backtraing (only allowed for ``subseq=True``) | |
| Returns | |
| ------- | |
| wp : list [shape=(N,)] | |
| Warping path with index pairs. | |
| Each list entry contains an index pair | |
| (n, m) as a tuple | |
| See Also | |
| -------- | |
| dtw | |
| """ | |
| if start is None: | |
| cur_idx = (steps.shape[0] - 1, steps.shape[1] - 1) | |
| else: | |
| cur_idx = (steps.shape[0] - 1, start) | |
| wp = [] | |
| # Set starting point D(N, M) and append it to the path | |
| wp.append((cur_idx[0], cur_idx[1])) | |
| # Loop backwards. | |
| # Stop criteria: | |
| # Setting it to (0, 0) does not work for the subsequence dtw, | |
| # so we only ask to reach the first row of the matrix. | |
| while (subseq and cur_idx[0] > 0) or (not subseq and cur_idx != (0, 0)): | |
| cur_step_idx = steps[(cur_idx[0], cur_idx[1])] | |
| # save tuple with minimal acc. cost in path | |
| cur_idx = ( | |
| cur_idx[0] - step_sizes_sigma[cur_step_idx][0], | |
| cur_idx[1] - step_sizes_sigma[cur_step_idx][1], | |
| ) | |
| # If we run off the side of the cost matrix, break here | |
| if min(cur_idx) < 0: | |
| break | |
| # append to warping path | |
| wp.append((cur_idx[0], cur_idx[1])) | |
| return wp | |
| def dtw_backtracking( | |
| steps: np.ndarray, | |
| *, | |
| step_sizes_sigma: Optional[np.ndarray] = None, | |
| subseq: bool = False, | |
| start: Optional[Union[int, np.integer[Any]]] = None, | |
| ) -> np.ndarray: | |
| """Backtrack a warping path. | |
| Uses the saved step sizes from the cost accumulation | |
| step to backtrack the index pairs for a warping path. | |
| Parameters | |
| ---------- | |
| steps : np.ndarray [shape=(N, M)] | |
| Step matrix, containing the indices of the used steps from the cost | |
| accumulation step. | |
| step_sizes_sigma : np.ndarray [shape=[n, 2]] | |
| Specifies allowed step sizes as used by the dtw. | |
| subseq : bool | |
| Enable subsequence DTW, e.g., for retrieval tasks. | |
| start : int | |
| Start column index for backtraing (only allowed for ``subseq=True``) | |
| Returns | |
| ------- | |
| wp : list [shape=(N,)] | |
| Warping path with index pairs. | |
| Each list entry contains an index pair | |
| (n, m) as a tuple | |
| See Also | |
| -------- | |
| dtw | |
| """ | |
| if subseq is False and start is not None: | |
| raise ParameterError( | |
| f"start is only allowed to be set if subseq is True (start={start}, subseq={subseq})" | |
| ) | |
| # Default Parameters | |
| default_steps = np.array([[1, 1], [0, 1], [1, 0]], dtype=np.uint32) | |
| if step_sizes_sigma is None: | |
| # Use the default steps | |
| step_sizes_sigma = default_steps | |
| else: | |
| # Append custom steps and weights to our defaults | |
| step_sizes_sigma = np.concatenate((default_steps, step_sizes_sigma)) | |
| wp = __dtw_backtracking(steps, step_sizes_sigma, subseq, start) | |
| return np.asarray(wp, dtype=int) | |
| def rqa( | |
| sim: np.ndarray, | |
| *, | |
| gap_onset: float = ..., | |
| gap_extend: float = ..., | |
| knight_moves: bool = ..., | |
| backtrack: Literal[False], | |
| ) -> np.ndarray: | |
| ... | |
| def rqa( | |
| sim: np.ndarray, | |
| *, | |
| gap_onset: float = ..., | |
| gap_extend: float = ..., | |
| knight_moves: bool = ..., | |
| backtrack: Literal[True] = ..., | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def rqa( | |
| sim: np.ndarray, | |
| *, | |
| gap_onset: float = ..., | |
| gap_extend: float = ..., | |
| knight_moves: bool = ..., | |
| backtrack: bool = ..., | |
| ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| ... | |
| def rqa( | |
| sim: np.ndarray, | |
| *, | |
| gap_onset: float = 1, | |
| gap_extend: float = 1, | |
| knight_moves: bool = True, | |
| backtrack: bool = True, | |
| ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| """Recurrence quantification analysis (RQA) | |
| This function implements different forms of RQA as described by | |
| Serra, Serra, and Andrzejak (SSA). [#]_ These methods take as input | |
| a self- or cross-similarity matrix ``sim``, and calculate the value | |
| of path alignments by dynamic programming. | |
| Note that unlike dynamic time warping (`dtw`), alignment paths here are | |
| maximized, not minimized, so the input should measure similarity rather | |
| than distance. | |
| The simplest RQA method, denoted as `L` (SSA equation 3) and equivalent | |
| to the method described by Eckman, Kamphorst, and Ruelle [#]_, accumulates | |
| the length of diagonal paths with positive values in the input: | |
| - ``score[i, j] = score[i-1, j-1] + 1`` if ``sim[i, j] > 0`` | |
| - ``score[i, j] = 0`` otherwise. | |
| The second method, denoted as `S` (SSA equation 4), is similar to the first, | |
| but allows for "knight moves" (as in the chess piece) in addition to strict | |
| diagonal moves: | |
| - ``score[i, j] = max(score[i-1, j-1], score[i-2, j-1], score[i-1, j-2]) + 1`` if ``sim[i, j] > | |
| 0`` | |
| - ``score[i, j] = 0`` otherwise. | |
| The third method, denoted as `Q` (SSA equations 5 and 6) extends this by | |
| allowing gaps in the alignment that incur some cost, rather than a hard | |
| reset to 0 whenever ``sim[i, j] == 0``. | |
| Gaps are penalized by two additional parameters, ``gap_onset`` and ``gap_extend``, | |
| which are subtracted from the value of the alignment path every time a gap | |
| is introduced or extended (respectively). | |
| Note that setting ``gap_onset`` and ``gap_extend`` to `np.inf` recovers the second | |
| method, and disabling knight moves recovers the first. | |
| .. [#] Serrà, Joan, Xavier Serra, and Ralph G. Andrzejak. | |
| "Cross recurrence quantification for cover song identification." | |
| New Journal of Physics 11, no. 9 (2009): 093017. | |
| .. [#] Eckmann, J. P., S. Oliffson Kamphorst, and D. Ruelle. | |
| "Recurrence plots of dynamical systems." | |
| World Scientific Series on Nonlinear Science Series A 16 (1995): 441-446. | |
| Parameters | |
| ---------- | |
| sim : np.ndarray [shape=(N, M), non-negative] | |
| The similarity matrix to use as input. | |
| This can either be a recurrence matrix (self-similarity) | |
| or a cross-similarity matrix between two sequences. | |
| gap_onset : float > 0 | |
| Penalty for introducing a gap to an alignment sequence | |
| gap_extend : float > 0 | |
| Penalty for extending a gap in an alignment sequence | |
| knight_moves : bool | |
| If ``True`` (default), allow for "knight moves" in the alignment, | |
| e.g., ``(n, m) => (n + 1, m + 2)`` or ``(n + 2, m + 1)``. | |
| If ``False``, only allow for diagonal moves ``(n, m) => (n + 1, m + 1)``. | |
| backtrack : bool | |
| If ``True``, return the alignment path. | |
| If ``False``, only return the score matrix. | |
| Returns | |
| ------- | |
| score : np.ndarray [shape=(N, M)] | |
| The alignment score matrix. ``score[n, m]`` is the cumulative value of | |
| the best alignment sequence ending in frames ``n`` and ``m``. | |
| path : np.ndarray [shape=(k, 2)] (optional) | |
| If ``backtrack=True``, ``path`` contains a list of pairs of aligned frames | |
| in the best alignment sequence. | |
| ``path[i] = [n, m]`` indicates that row ``n`` aligns to column ``m``. | |
| See Also | |
| -------- | |
| librosa.segment.recurrence_matrix | |
| librosa.segment.cross_similarity | |
| dtw | |
| Examples | |
| -------- | |
| Simple diagonal path enhancement (L-mode) | |
| >>> import numpy as np | |
| >>> import matplotlib.pyplot as plt | |
| >>> y, sr = librosa.load(librosa.ex('nutcracker'), duration=30) | |
| >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr) | |
| >>> # Use time-delay embedding to reduce noise | |
| >>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3) | |
| >>> # Build recurrence, suppress self-loops within 1 second | |
| >>> rec = librosa.segment.recurrence_matrix(chroma_stack, width=43, | |
| ... mode='affinity', | |
| ... metric='cosine') | |
| >>> # using infinite cost for gaps enforces strict path continuation | |
| >>> L_score, L_path = librosa.sequence.rqa(rec, | |
| ... gap_onset=np.inf, | |
| ... gap_extend=np.inf, | |
| ... knight_moves=False) | |
| >>> fig, ax = plt.subplots(ncols=2) | |
| >>> librosa.display.specshow(rec, x_axis='frames', y_axis='frames', ax=ax[0]) | |
| >>> ax[0].set(title='Recurrence matrix') | |
| >>> librosa.display.specshow(L_score, x_axis='frames', y_axis='frames', ax=ax[1]) | |
| >>> ax[1].set(title='Alignment score matrix') | |
| >>> ax[1].plot(L_path[:, 1], L_path[:, 0], label='Optimal path', color='c') | |
| >>> ax[1].legend() | |
| >>> ax[1].label_outer() | |
| Full alignment using gaps and knight moves | |
| >>> # New gaps cost 5, extending old gaps cost 10 for each step | |
| >>> score, path = librosa.sequence.rqa(rec, gap_onset=5, gap_extend=10) | |
| >>> fig, ax = plt.subplots(ncols=2, sharex=True, sharey=True) | |
| >>> librosa.display.specshow(rec, x_axis='frames', y_axis='frames', ax=ax[0]) | |
| >>> ax[0].set(title='Recurrence matrix') | |
| >>> librosa.display.specshow(score, x_axis='frames', y_axis='frames', ax=ax[1]) | |
| >>> ax[1].set(title='Alignment score matrix') | |
| >>> ax[1].plot(path[:, 1], path[:, 0], label='Optimal path', color='c') | |
| >>> ax[1].legend() | |
| >>> ax[1].label_outer() | |
| """ | |
| if gap_onset < 0: | |
| raise ParameterError("gap_onset={} must be strictly positive") | |
| if gap_extend < 0: | |
| raise ParameterError("gap_extend={} must be strictly positive") | |
| score: np.ndarray | |
| pointers: np.ndarray | |
| score, pointers = __rqa_dp(sim, gap_onset, gap_extend, knight_moves) | |
| if backtrack: | |
| path = __rqa_backtrack(score, pointers) | |
| return score, path | |
| return score | |
| # type: ignore | |
| def __rqa_dp( | |
| sim: np.ndarray, gap_onset: float, gap_extend: float, knight: bool | |
| ) -> Tuple[np.ndarray, np.ndarray]: # pragma: no cover | |
| """RQA dynamic programming implementation""" | |
| # The output array | |
| score = np.zeros(sim.shape, dtype=sim.dtype) | |
| # The backtracking array | |
| backtrack = np.zeros(sim.shape, dtype=np.int8) | |
| # These are place-holder arrays to limit the points being considered | |
| # at each step of the DP | |
| # | |
| # If knight moves are enabled, values are indexed according to | |
| # [(-1,-1), (-1, -2), (-2, -1)] | |
| # | |
| # If knight moves are disabled, then only the first entry is used. | |
| # | |
| # Using dummy vectors here makes the code a bit cleaner down below. | |
| sim_values = np.zeros(3) | |
| score_values = np.zeros(3) | |
| vec = np.zeros(3) | |
| if knight: | |
| # Initial limit is for the base case: diagonal + one knight | |
| init_limit = 2 | |
| # Otherwise, we have 3 positions | |
| limit = 3 | |
| else: | |
| init_limit = 1 | |
| limit = 1 | |
| # backtracking rubric: | |
| # 0 ==> diagonal move | |
| # 1 ==> knight move up | |
| # 2 ==> knight move left | |
| # -1 ==> reset without inclusion | |
| # -2 ==> reset with inclusion (ie positive value at init) | |
| # Initialize the first row and column with the data | |
| score[0, :] = sim[0, :] | |
| score[:, 0] = sim[:, 0] | |
| # backtracking initialization: the first row and column are all resets | |
| # if there's a positive link here, it's an inclusive reset | |
| for i in range(sim.shape[0]): | |
| if sim[i, 0]: | |
| backtrack[i, 0] = -2 | |
| else: | |
| backtrack[i, 0] = -1 | |
| for j in range(sim.shape[1]): | |
| if sim[0, j]: | |
| backtrack[0, j] = -2 | |
| else: | |
| backtrack[0, j] = -1 | |
| # Initialize the 1-1 case using only the diagonal | |
| if sim[1, 1] > 0: | |
| score[1, 1] = score[0, 0] + sim[1, 1] | |
| backtrack[1, 1] = 0 | |
| else: | |
| link = sim[0, 0] > 0 | |
| score[1, 1] = max(0, score[0, 0] - (link) * gap_onset - (~link) * gap_extend) | |
| if score[1, 1] > 0: | |
| backtrack[1, 1] = 0 | |
| else: | |
| backtrack[1, 1] = -1 | |
| # Initialize the second row with diagonal and left-knight moves | |
| i = 1 | |
| for j in range(2, sim.shape[1]): | |
| score_values[:-1] = (score[i - 1, j - 1], score[i - 1, j - 2]) | |
| sim_values[:-1] = (sim[i - 1, j - 1], sim[i - 1, j - 2]) | |
| t_values = sim_values > 0 | |
| if sim[i, j] > 0: | |
| backtrack[i, j] = np.argmax(score_values[:init_limit]) | |
| score[i, j] = score_values[backtrack[i, j]] + sim[i, j] # or + 1 for binary | |
| else: | |
| vec[:init_limit] = ( | |
| score_values[:init_limit] | |
| - t_values[:init_limit] * gap_onset | |
| - (~t_values[:init_limit]) * gap_extend | |
| ) | |
| backtrack[i, j] = np.argmax(vec[:init_limit]) | |
| score[i, j] = max(0, vec[backtrack[i, j]]) | |
| # Is it a reset? | |
| if score[i, j] == 0: | |
| backtrack[i, j] = -1 | |
| # Initialize the second column with diagonal and up-knight moves | |
| j = 1 | |
| for i in range(2, sim.shape[0]): | |
| score_values[:-1] = (score[i - 1, j - 1], score[i - 2, j - 1]) | |
| sim_values[:-1] = (sim[i - 1, j - 1], sim[i - 2, j - 1]) | |
| t_values = sim_values > 0 | |
| if sim[i, j] > 0: | |
| backtrack[i, j] = np.argmax(score_values[:init_limit]) | |
| score[i, j] = score_values[backtrack[i, j]] + sim[i, j] # or + 1 for binary | |
| else: | |
| vec[:init_limit] = ( | |
| score_values[:init_limit] | |
| - t_values[:init_limit] * gap_onset | |
| - (~t_values[:init_limit]) * gap_extend | |
| ) | |
| backtrack[i, j] = np.argmax(vec[:init_limit]) | |
| score[i, j] = max(0, vec[backtrack[i, j]]) | |
| # Is it a reset? | |
| if score[i, j] == 0: | |
| backtrack[i, j] = -1 | |
| # Now fill in the rest of the table | |
| for i in range(2, sim.shape[0]): | |
| for j in range(2, sim.shape[1]): | |
| score_values[:] = ( | |
| score[i - 1, j - 1], | |
| score[i - 1, j - 2], | |
| score[i - 2, j - 1], | |
| ) | |
| sim_values[:] = (sim[i - 1, j - 1], sim[i - 1, j - 2], sim[i - 2, j - 1]) | |
| t_values = sim_values > 0 | |
| if sim[i, j] > 0: | |
| # if knight is true, it's max of (-1,-1), (-1, -2), (-2, -1) | |
| # otherwise, it's just the diagonal move (-1, -1) | |
| # for backtracking purposes, if the max is 0 then it's the start of a new sequence | |
| # if the max is non-zero, then we extend the existing sequence | |
| backtrack[i, j] = np.argmax(score_values[:limit]) | |
| score[i, j] = ( | |
| score_values[backtrack[i, j]] + sim[i, j] | |
| ) # or + 1 for binary | |
| else: | |
| # if the max of our options is negative, then it's a hard reset | |
| # otherwise, it's a skip move | |
| vec[:limit] = ( | |
| score_values[:limit] | |
| - t_values[:limit] * gap_onset | |
| - (~t_values[:limit]) * gap_extend | |
| ) | |
| backtrack[i, j] = np.argmax(vec[:limit]) | |
| score[i, j] = max(0, vec[backtrack[i, j]]) | |
| # Is it a reset? | |
| if score[i, j] == 0: | |
| backtrack[i, j] = -1 | |
| return score, backtrack | |
| def __rqa_backtrack(score, pointers): | |
| """RQA path backtracking | |
| Given the score matrix and backtracking index array, | |
| reconstruct the optimal path. | |
| """ | |
| # backtracking rubric: | |
| # 0 ==> diagonal move | |
| # 1 ==> knight move up | |
| # 2 ==> knight move left | |
| # -1 ==> reset (sim = 0) | |
| # -2 ==> start of sequence (sim > 0) | |
| # This array maps the backtracking values to the | |
| # relative index offsets | |
| offsets = [(-1, -1), (-1, -2), (-2, -1)] | |
| # Find the maximum to end the path | |
| idx = list(np.unravel_index(np.argmax(score), score.shape)) | |
| # Construct the path | |
| path: List = [] | |
| while True: | |
| bt_index = pointers[tuple(idx)] | |
| # A -1 indicates a non-inclusive reset | |
| # this can only happen when sim[idx] == 0, | |
| # and a reset with zero score should not be included | |
| # in the path. In this case, we're done. | |
| if bt_index == -1: | |
| break | |
| # Other bt_index values are okay for inclusion | |
| path.insert(0, idx) | |
| # -2 indicates beginning of sequence, | |
| # so we can't backtrack any further | |
| if bt_index == -2: | |
| break | |
| # Otherwise, prepend this index and continue | |
| idx = [idx[_] + offsets[bt_index][_] for _ in range(len(idx))] | |
| # If there's no alignment path at all, eg an empty cross-similarity | |
| # matrix, return a properly shaped and typed array | |
| if not path: | |
| return np.empty((0, 2), dtype=np.uint) | |
| return np.asarray(path, dtype=np.uint) | |
| # type: ignore | |
| def _viterbi( | |
| log_prob: np.ndarray, log_trans: np.ndarray, log_p_init: np.ndarray | |
| ) -> Tuple[np.ndarray, np.ndarray]: # pragma: no cover | |
| """Core Viterbi algorithm. | |
| This is intended for internal use only. | |
| Parameters | |
| ---------- | |
| log_prob : np.ndarray [shape=(T, m)] | |
| ``log_prob[t, s]`` is the conditional log-likelihood | |
| ``log P[X = X(t) | State(t) = s]`` | |
| log_trans : np.ndarray [shape=(m, m)] | |
| The log transition matrix | |
| ``log_trans[i, j] = log P[State(t+1) = j | State(t) = i]`` | |
| log_p_init : np.ndarray [shape=(m,)] | |
| log of the initial state distribution | |
| Returns | |
| ------- | |
| None | |
| All computations are performed in-place on ``state, value, ptr``. | |
| """ | |
| n_steps, n_states = log_prob.shape | |
| state = np.zeros(n_steps, dtype=np.uint16) | |
| value = np.zeros((n_steps, n_states), dtype=np.float64) | |
| ptr = np.zeros((n_steps, n_states), dtype=np.uint16) | |
| # factor in initial state distribution | |
| value[0] = log_prob[0] + log_p_init | |
| for t in range(1, n_steps): | |
| # Want V[t, j] <- p[t, j] * max_k V[t-1, k] * A[k, j] | |
| # assume at time t-1 we were in state k | |
| # transition k -> j | |
| # Broadcast over rows: | |
| # Tout[k, j] = V[t-1, k] * A[k, j] | |
| # then take the max over columns | |
| # We'll do this in log-space for stability | |
| trans_out = value[t - 1] + log_trans.T | |
| # Unroll the max/argmax loop to enable numba support | |
| for j in range(n_states): | |
| ptr[t, j] = np.argmax(trans_out[j]) | |
| # value[t, j] = log_prob[t, j] + np.max(trans_out[j]) | |
| value[t, j] = log_prob[t, j] + trans_out[j, ptr[t][j]] | |
| # Now roll backward | |
| # Get the last state | |
| state[-1] = np.argmax(value[-1]) | |
| for t in range(n_steps - 2, -1, -1): | |
| state[t] = ptr[t + 1, state[t + 1]] | |
| logp = value[-1:, state[-1]] | |
| return state, logp | |
| def viterbi( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: Literal[True], | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def viterbi( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: Literal[False] = ..., | |
| ) -> np.ndarray: | |
| ... | |
| def viterbi( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_init: Optional[np.ndarray] = None, | |
| return_logp: bool = False, | |
| ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| """Viterbi decoding from observation likelihoods. | |
| Given a sequence of observation likelihoods ``prob[s, t]``, | |
| indicating the conditional likelihood of seeing the observation | |
| at time ``t`` from state ``s``, and a transition matrix | |
| ``transition[i, j]`` which encodes the conditional probability of | |
| moving from state ``i`` to state ``j``, the Viterbi algorithm [#]_ computes | |
| the most likely sequence of states from the observations. | |
| .. [#] Viterbi, Andrew. "Error bounds for convolutional codes and an | |
| asymptotically optimum decoding algorithm." | |
| IEEE transactions on Information Theory 13.2 (1967): 260-269. | |
| Parameters | |
| ---------- | |
| prob : np.ndarray [shape=(..., n_states, n_steps), non-negative] | |
| ``prob[..., s, t]`` is the probability of observation at time ``t`` | |
| being generated by state ``s``. | |
| transition : np.ndarray [shape=(n_states, n_states), non-negative] | |
| ``transition[i, j]`` is the probability of a transition from i->j. | |
| Each row must sum to 1. | |
| p_init : np.ndarray [shape=(n_states,)] | |
| Optional: initial state distribution. | |
| If not provided, a uniform distribution is assumed. | |
| return_logp : bool | |
| If ``True``, return the log-likelihood of the state sequence. | |
| Returns | |
| ------- | |
| Either ``states`` or ``(states, logp)``: | |
| states : np.ndarray [shape=(..., n_steps,)] | |
| The most likely state sequence. | |
| If ``prob`` contains multiple channels of input, then each channel is | |
| decoded independently. | |
| logp : scalar [float] or np.ndarray | |
| If ``return_logp=True``, the log probability of ``states`` given | |
| the observations. | |
| See Also | |
| -------- | |
| viterbi_discriminative : Viterbi decoding from state likelihoods | |
| Examples | |
| -------- | |
| Example from https://en.wikipedia.org/wiki/Viterbi_algorithm#Example | |
| In this example, we have two states ``healthy`` and ``fever``, with | |
| initial probabilities 60% and 40%. | |
| We have three observation possibilities: ``normal``, ``cold``, and | |
| ``dizzy``, whose probabilities given each state are: | |
| ``healthy => {normal: 50%, cold: 40%, dizzy: 10%}`` and | |
| ``fever => {normal: 10%, cold: 30%, dizzy: 60%}`` | |
| Finally, we have transition probabilities: | |
| ``healthy => healthy (70%)`` and | |
| ``fever => fever (60%)``. | |
| Over three days, we observe the sequence ``[normal, cold, dizzy]``, | |
| and wish to know the maximum likelihood assignment of states for the | |
| corresponding days, which we compute with the Viterbi algorithm below. | |
| >>> p_init = np.array([0.6, 0.4]) | |
| >>> p_emit = np.array([[0.5, 0.4, 0.1], | |
| ... [0.1, 0.3, 0.6]]) | |
| >>> p_trans = np.array([[0.7, 0.3], [0.4, 0.6]]) | |
| >>> path, logp = librosa.sequence.viterbi(p_emit, p_trans, p_init=p_init, | |
| ... return_logp=True) | |
| >>> print(logp, path) | |
| -4.19173690823075 [0 0 1] | |
| """ | |
| n_states, n_steps = prob.shape[-2:] | |
| if transition.shape != (n_states, n_states): | |
| raise ParameterError( | |
| f"transition.shape={transition.shape}, must be " | |
| f"(n_states, n_states)={n_states, n_states}" | |
| ) | |
| if np.any(transition < 0) or not np.allclose(transition.sum(axis=1), 1): | |
| raise ParameterError( | |
| "Invalid transition matrix: must be non-negative " | |
| "and sum to 1 on each row." | |
| ) | |
| if np.any(prob < 0) or np.any(prob > 1): | |
| raise ParameterError("Invalid probability values: must be between 0 and 1.") | |
| # Compute log-likelihoods while avoiding log-underflow | |
| epsilon = tiny(prob) | |
| if p_init is None: | |
| p_init = np.empty(n_states) | |
| p_init.fill(1.0 / n_states) | |
| elif ( | |
| np.any(p_init < 0) | |
| or not np.allclose(p_init.sum(), 1) | |
| or p_init.shape != (n_states,) | |
| ): | |
| raise ParameterError(f"Invalid initial state distribution: p_init={p_init}") | |
| log_trans = np.log(transition + epsilon) | |
| log_prob = np.log(prob + epsilon) | |
| log_p_init = np.log(p_init + epsilon) | |
| def _helper(lp): | |
| # Transpose input | |
| _state, logp = _viterbi(lp.T, log_trans, log_p_init) | |
| # Transpose outputs for return | |
| return _state.T, logp | |
| states: np.ndarray | |
| logp: np.ndarray | |
| if log_prob.ndim == 2: | |
| states, logp = _helper(log_prob) | |
| else: | |
| # Vectorize the helper | |
| __viterbi = np.vectorize( | |
| _helper, otypes=[np.uint16, np.float64], signature="(s,t)->(t),(1)" | |
| ) | |
| states, logp = __viterbi(log_prob) | |
| # Flatten out the trailing dimension introduced by vectorization | |
| logp = logp[..., 0] | |
| if return_logp: | |
| return states, logp | |
| return states | |
| def viterbi_discriminative( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = ..., | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: Literal[False] = ..., | |
| ) -> np.ndarray: | |
| ... | |
| def viterbi_discriminative( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = ..., | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: Literal[True], | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def viterbi_discriminative( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = ..., | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: bool, | |
| ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| ... | |
| def viterbi_discriminative( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = None, | |
| p_init: Optional[np.ndarray] = None, | |
| return_logp: bool = False, | |
| ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| """Viterbi decoding from discriminative state predictions. | |
| Given a sequence of conditional state predictions ``prob[s, t]``, | |
| indicating the conditional likelihood of state ``s`` given the | |
| observation at time ``t``, and a transition matrix ``transition[i, j]`` | |
| which encodes the conditional probability of moving from state ``i`` | |
| to state ``j``, the Viterbi algorithm computes the most likely sequence | |
| of states from the observations. | |
| This implementation uses the standard Viterbi decoding algorithm | |
| for observation likelihood sequences, under the assumption that | |
| ``P[Obs(t) | State(t) = s]`` is proportional to | |
| ``P[State(t) = s | Obs(t)] / P[State(t) = s]``, where the denominator | |
| is the marginal probability of state ``s`` occurring as given by ``p_state``. | |
| Note that because the denominator ``P[State(t) = s]`` is not explicitly | |
| calculated, the resulting probabilities (or log-probabilities) are not | |
| normalized. If using the `return_logp=True` option (see below), | |
| be aware that the "probabilities" may not sum to (and may exceed) 1. | |
| Parameters | |
| ---------- | |
| prob : np.ndarray [shape=(..., n_states, n_steps), non-negative] | |
| ``prob[s, t]`` is the probability of state ``s`` conditional on | |
| the observation at time ``t``. | |
| Must be non-negative and sum to 1 along each column. | |
| transition : np.ndarray [shape=(n_states, n_states), non-negative] | |
| ``transition[i, j]`` is the probability of a transition from i->j. | |
| Each row must sum to 1. | |
| p_state : np.ndarray [shape=(n_states,)] | |
| Optional: marginal probability distribution over states, | |
| must be non-negative and sum to 1. | |
| If not provided, a uniform distribution is assumed. | |
| p_init : np.ndarray [shape=(n_states,)] | |
| Optional: initial state distribution. | |
| If not provided, it is assumed to be uniform. | |
| return_logp : bool | |
| If ``True``, return the log-likelihood of the state sequence. | |
| Returns | |
| ------- | |
| Either ``states`` or ``(states, logp)``: | |
| states : np.ndarray [shape=(..., n_steps,)] | |
| The most likely state sequence. | |
| If ``prob`` contains multiple input channels, | |
| then each channel is decoded independently. | |
| logp : scalar [float] or np.ndarray | |
| If ``return_logp=True``, the (unnormalized) log probability | |
| of ``states`` given the observations. | |
| See Also | |
| -------- | |
| viterbi : | |
| Viterbi decoding from observation likelihoods | |
| viterbi_binary : | |
| Viterbi decoding for multi-label, conditional state likelihoods | |
| Examples | |
| -------- | |
| This example constructs a simple, template-based discriminative chord estimator, | |
| using CENS chroma as input features. | |
| .. note:: this chord model is not accurate enough to use in practice. It is only | |
| intended to demonstrate how to use discriminative Viterbi decoding. | |
| >>> # Create templates for major, minor, and no-chord qualities | |
| >>> maj_template = np.array([1,0,0, 0,1,0, 0,1,0, 0,0,0]) | |
| >>> min_template = np.array([1,0,0, 1,0,0, 0,1,0, 0,0,0]) | |
| >>> N_template = np.array([1,1,1, 1,1,1, 1,1,1, 1,1,1.]) / 4. | |
| >>> # Generate the weighting matrix that maps chroma to labels | |
| >>> weights = np.zeros((25, 12), dtype=float) | |
| >>> labels = ['C:maj', 'C#:maj', 'D:maj', 'D#:maj', 'E:maj', 'F:maj', | |
| ... 'F#:maj', 'G:maj', 'G#:maj', 'A:maj', 'A#:maj', 'B:maj', | |
| ... 'C:min', 'C#:min', 'D:min', 'D#:min', 'E:min', 'F:min', | |
| ... 'F#:min', 'G:min', 'G#:min', 'A:min', 'A#:min', 'B:min', | |
| ... 'N'] | |
| >>> for c in range(12): | |
| ... weights[c, :] = np.roll(maj_template, c) # c:maj | |
| ... weights[c + 12, :] = np.roll(min_template, c) # c:min | |
| >>> weights[-1] = N_template # the last row is the no-chord class | |
| >>> # Make a self-loop transition matrix over 25 states | |
| >>> trans = librosa.sequence.transition_loop(25, 0.9) | |
| >>> # Load in audio and make features | |
| >>> y, sr = librosa.load(librosa.ex('nutcracker'), duration=15) | |
| >>> # Suppress percussive elements | |
| >>> y = librosa.effects.harmonic(y, margin=4) | |
| >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr) | |
| >>> # Map chroma (observations) to class (state) likelihoods | |
| >>> probs = np.exp(weights.dot(chroma)) # P[class | chroma] ~= exp(template' chroma) | |
| >>> probs /= probs.sum(axis=0, keepdims=True) # probabilities must sum to 1 in each column | |
| >>> # Compute independent frame-wise estimates | |
| >>> chords_ind = np.argmax(probs, axis=0) | |
| >>> # And viterbi estimates | |
| >>> chords_vit = librosa.sequence.viterbi_discriminative(probs, trans) | |
| >>> # Plot the features and prediction map | |
| >>> import matplotlib.pyplot as plt | |
| >>> fig, ax = plt.subplots(nrows=2) | |
| >>> librosa.display.specshow(chroma, x_axis='time', y_axis='chroma', ax=ax[0]) | |
| >>> librosa.display.specshow(weights, x_axis='chroma', ax=ax[1]) | |
| >>> ax[1].set(yticks=np.arange(25) + 0.5, yticklabels=labels, ylabel='Chord') | |
| >>> # And plot the results | |
| >>> fig, ax = plt.subplots() | |
| >>> librosa.display.specshow(probs, x_axis='time', cmap='gray', ax=ax) | |
| >>> times = librosa.times_like(chords_vit) | |
| >>> ax.scatter(times, chords_ind + 0.25, color='lime', alpha=0.5, marker='+', | |
| ... s=15, label='Independent') | |
| >>> ax.scatter(times, chords_vit - 0.25, color='deeppink', alpha=0.5, marker='o', | |
| ... s=15, label='Viterbi') | |
| >>> ax.set(yticks=np.unique(chords_vit), | |
| ... yticklabels=[labels[i] for i in np.unique(chords_vit)]) | |
| >>> ax.legend() | |
| """ | |
| n_states, n_steps = prob.shape[-2:] | |
| if transition.shape != (n_states, n_states): | |
| raise ParameterError( | |
| f"transition.shape={transition.shape}, must be " | |
| f"(n_states, n_states)={n_states, n_states}" | |
| ) | |
| if np.any(transition < 0) or not np.allclose(transition.sum(axis=1), 1): | |
| raise ParameterError( | |
| "Invalid transition matrix: must be non-negative " | |
| "and sum to 1 on each row." | |
| ) | |
| if np.any(prob < 0) or not np.allclose(prob.sum(axis=-2), 1): | |
| raise ParameterError( | |
| "Invalid probability values: each column must " | |
| "sum to 1 and be non-negative" | |
| ) | |
| # Compute log-likelihoods while avoiding log-underflow | |
| epsilon = tiny(prob) | |
| # Compute marginal log probabilities while avoiding underflow | |
| if p_state is None: | |
| p_state = np.empty(n_states) | |
| p_state.fill(1.0 / n_states) | |
| elif p_state.shape != (n_states,): | |
| raise ParameterError( | |
| "Marginal distribution p_state must have shape (n_states,). " | |
| f"Got p_state.shape={p_state.shape}" | |
| ) | |
| elif np.any(p_state < 0) or not np.allclose(p_state.sum(axis=-1), 1): | |
| raise ParameterError(f"Invalid marginal state distribution: p_state={p_state}") | |
| if p_init is None: | |
| p_init = np.empty(n_states) | |
| p_init.fill(1.0 / n_states) | |
| elif ( | |
| np.any(p_init < 0) | |
| or not np.allclose(p_init.sum(), 1) | |
| or p_init.shape != (n_states,) | |
| ): | |
| raise ParameterError(f"Invalid initial state distribution: p_init={p_init}") | |
| # By Bayes' rule, P[X | Y] * P[Y] = P[Y | X] * P[X] | |
| # P[X] is constant for the sake of maximum likelihood inference | |
| # and P[Y] is given by the marginal distribution p_state. | |
| # | |
| # So we have P[X | y] \propto P[Y | x] / P[Y] | |
| # if X = observation and Y = states, this can be done in log space as | |
| # log P[X | y] \propto \log P[Y | x] - \log P[Y] | |
| log_p_init = np.log(p_init + epsilon) | |
| log_trans = np.log(transition + epsilon) | |
| log_marginal = np.log(p_state + epsilon) | |
| # reshape to broadcast against prob | |
| log_marginal = expand_to(log_marginal, ndim=prob.ndim, axes=-2) | |
| log_prob = np.log(prob + epsilon) - log_marginal | |
| def _helper(lp): | |
| # Transpose input | |
| _state, logp = _viterbi(lp.T, log_trans, log_p_init) | |
| # Transpose outputs for return | |
| return _state.T, logp | |
| states: np.ndarray | |
| logp: np.ndarray | |
| if log_prob.ndim == 2: | |
| states, logp = _helper(log_prob) | |
| else: | |
| # Vectorize the helper | |
| __viterbi = np.vectorize( | |
| _helper, otypes=[np.uint16, np.float64], signature="(s,t)->(t),(1)" | |
| ) | |
| states, logp = __viterbi(log_prob) | |
| # Flatten out the trailing dimension | |
| logp = logp[..., 0] | |
| if return_logp: | |
| return states, logp | |
| return states | |
| def viterbi_binary( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = ..., | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: Literal[False] = ..., | |
| ) -> np.ndarray: | |
| ... | |
| def viterbi_binary( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = ..., | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: Literal[True], | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| ... | |
| def viterbi_binary( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = ..., | |
| p_init: Optional[np.ndarray] = ..., | |
| return_logp: bool = ..., | |
| ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| ... | |
| def viterbi_binary( | |
| prob: np.ndarray, | |
| transition: np.ndarray, | |
| *, | |
| p_state: Optional[np.ndarray] = None, | |
| p_init: Optional[np.ndarray] = None, | |
| return_logp: bool = False, | |
| ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| """Viterbi decoding from binary (multi-label), discriminative state predictions. | |
| Given a sequence of conditional state predictions ``prob[s, t]``, | |
| indicating the conditional likelihood of state ``s`` being active | |
| conditional on observation at time ``t``, and a 2*2 transition matrix | |
| ``transition`` which encodes the conditional probability of moving from | |
| state ``s`` to state ``~s`` (not-``s``), the Viterbi algorithm computes the | |
| most likely sequence of states from the observations. | |
| This function differs from `viterbi_discriminative` in that it does not assume the | |
| states to be mutually exclusive. `viterbi_binary` is implemented by | |
| transforming the multi-label decoding problem to a collection | |
| of binary Viterbi problems (one for each *state* or label). | |
| The output is a binary matrix ``states[s, t]`` indicating whether each | |
| state ``s`` is active at time ``t``. | |
| Like `viterbi_discriminative`, the probabilities of the optimal state sequences | |
| are not normalized here. If using the `return_logp=True` option (see below), | |
| be aware that the "probabilities" may not sum to (and may exceed) 1. | |
| Parameters | |
| ---------- | |
| prob : np.ndarray [shape=(..., n_steps,) or (..., n_states, n_steps)], non-negative | |
| ``prob[s, t]`` is the probability of state ``s`` being active | |
| conditional on the observation at time ``t``. | |
| Must be non-negative and less than 1. | |
| If ``prob`` is 1-dimensional, it is expanded to shape ``(1, n_steps)``. | |
| If ``prob`` contains multiple input channels, then each channel is decoded independently. | |
| transition : np.ndarray [shape=(2, 2) or (n_states, 2, 2)], non-negative | |
| If 2-dimensional, the same transition matrix is applied to each sub-problem. | |
| ``transition[0, i]`` is the probability of the state going from inactive to ``i``, | |
| ``transition[1, i]`` is the probability of the state going from active to ``i``. | |
| Each row must sum to 1. | |
| If 3-dimensional, ``transition[s]`` is interpreted as the 2x2 transition matrix | |
| for state label ``s``. | |
| p_state : np.ndarray [shape=(n_states,)] | |
| Optional: marginal probability for each state (between [0,1]). | |
| If not provided, a uniform distribution (0.5 for each state) | |
| is assumed. | |
| p_init : np.ndarray [shape=(n_states,)] | |
| Optional: initial state distribution. | |
| If not provided, it is assumed to be uniform. | |
| return_logp : bool | |
| If ``True``, return the (unnormalized) log-likelihood of the state sequences. | |
| Returns | |
| ------- | |
| Either ``states`` or ``(states, logp)``: | |
| states : np.ndarray [shape=(..., n_states, n_steps)] | |
| The most likely state sequence. | |
| logp : np.ndarray [shape=(..., n_states,)] | |
| If ``return_logp=True``, the (unnormalized) log probability of each | |
| state activation sequence ``states`` | |
| See Also | |
| -------- | |
| viterbi : | |
| Viterbi decoding from observation likelihoods | |
| viterbi_discriminative : | |
| Viterbi decoding for discriminative (mutually exclusive) state predictions | |
| Examples | |
| -------- | |
| In this example, we have a sequence of binary state likelihoods that we want to de-noise | |
| under the assumption that state changes are relatively uncommon. Positive predictions | |
| should only be retained if they persist for multiple steps, and any transient predictions | |
| should be considered as errors. This use case arises frequently in problems such as | |
| instrument recognition, where state activations tend to be stable over time, but subject | |
| to abrupt changes (e.g., when an instrument joins the mix). | |
| We assume that the 0 state has a self-transition probability of 90%, and the 1 state | |
| has a self-transition probability of 70%. We assume the marginal and initial | |
| probability of either state is 50%. | |
| >>> trans = np.array([[0.9, 0.1], [0.3, 0.7]]) | |
| >>> prob = np.array([0.1, 0.7, 0.4, 0.3, 0.8, 0.9, 0.8, 0.2, 0.6, 0.3]) | |
| >>> librosa.sequence.viterbi_binary(prob, trans, p_state=0.5, p_init=0.5) | |
| array([[0, 0, 0, 0, 1, 1, 1, 0, 0, 0]]) | |
| """ | |
| prob = np.atleast_2d(prob) | |
| n_states, n_steps = prob.shape[-2:] | |
| if transition.shape == (2, 2): | |
| transition = np.tile(transition, (n_states, 1, 1)) | |
| elif transition.shape != (n_states, 2, 2): | |
| raise ParameterError( | |
| f"transition.shape={transition.shape}, must be (2, 2) or " | |
| f"(n_states, 2, 2)={n_states}" | |
| ) | |
| if np.any(transition < 0) or not np.allclose(transition.sum(axis=-1), 1): | |
| raise ParameterError( | |
| "Invalid transition matrix: must be non-negative " | |
| "and sum to 1 on each row." | |
| ) | |
| if np.any(prob < 0) or np.any(prob > 1): | |
| raise ParameterError("Invalid probability values: prob must be between [0, 1]") | |
| if p_state is None: | |
| p_state = np.empty(n_states) | |
| p_state.fill(0.5) | |
| else: | |
| p_state = np.atleast_1d(p_state) | |
| assert p_state is not None | |
| if p_state.shape != (n_states,) or np.any(p_state < 0) or np.any(p_state > 1): | |
| raise ParameterError(f"Invalid marginal state distributions: p_state={p_state}") | |
| if p_init is None: | |
| p_init = np.empty(n_states) | |
| p_init.fill(0.5) | |
| else: | |
| p_init = np.atleast_1d(p_init) | |
| assert p_init is not None | |
| if p_init.shape != (n_states,) or np.any(p_init < 0) or np.any(p_init > 1): | |
| raise ParameterError(f"Invalid initial state distributions: p_init={p_init}") | |
| shape_prefix = list(prob.shape[:-2]) | |
| states = np.empty(shape_prefix + [n_states, n_steps], dtype=np.uint16) | |
| logp = np.empty(shape_prefix + [n_states]) | |
| prob_binary = np.empty(shape_prefix + [2, n_steps]) | |
| p_state_binary = np.empty(2) | |
| p_init_binary = np.empty(2) | |
| for state in range(n_states): | |
| prob_binary[..., 0, :] = 1 - prob[..., state, :] | |
| prob_binary[..., 1, :] = prob[..., state, :] | |
| p_state_binary[0] = 1 - p_state[state] | |
| p_state_binary[1] = p_state[state] | |
| p_init_binary[0] = 1 - p_init[state] | |
| p_init_binary[1] = p_init[state] | |
| states[..., state, :], logp[..., state] = viterbi_discriminative( | |
| prob_binary, | |
| transition[state], | |
| p_state=p_state_binary, | |
| p_init=p_init_binary, | |
| return_logp=True, | |
| ) | |
| if return_logp: | |
| return states, logp | |
| return states | |
| def transition_uniform(n_states: int) -> np.ndarray: | |
| """Construct a uniform transition matrix over ``n_states``. | |
| Parameters | |
| ---------- | |
| n_states : int > 0 | |
| The number of states | |
| Returns | |
| ------- | |
| transition : np.ndarray [shape=(n_states, n_states)] | |
| ``transition[i, j] = 1./n_states`` | |
| Examples | |
| -------- | |
| >>> librosa.sequence.transition_uniform(3) | |
| array([[0.333, 0.333, 0.333], | |
| [0.333, 0.333, 0.333], | |
| [0.333, 0.333, 0.333]]) | |
| """ | |
| if not is_positive_int(n_states): | |
| raise ParameterError(f"n_states={n_states} must be a positive integer") | |
| transition = np.empty((n_states, n_states), dtype=np.float64) | |
| transition.fill(1.0 / n_states) | |
| return transition | |
| def transition_loop(n_states: int, prob: Union[float, Iterable[float]]) -> np.ndarray: | |
| """Construct a self-loop transition matrix over ``n_states``. | |
| The transition matrix will have the following properties: | |
| - ``transition[i, i] = p`` for all ``i`` | |
| - ``transition[i, j] = (1 - p) / (n_states - 1)`` for all ``j != i`` | |
| This type of transition matrix is appropriate when states tend to be | |
| locally stable, and there is no additional structure between different | |
| states. This is primarily useful for de-noising frame-wise predictions. | |
| Parameters | |
| ---------- | |
| n_states : int > 1 | |
| The number of states | |
| prob : float in [0, 1] or iterable, length=n_states | |
| If a scalar, this is the probability of a self-transition. | |
| If a vector of length ``n_states``, ``p[i]`` is the probability of self-transition in state ``i`` | |
| Returns | |
| ------- | |
| transition : np.ndarray [shape=(n_states, n_states)] | |
| The transition matrix | |
| Examples | |
| -------- | |
| >>> librosa.sequence.transition_loop(3, 0.5) | |
| array([[0.5 , 0.25, 0.25], | |
| [0.25, 0.5 , 0.25], | |
| [0.25, 0.25, 0.5 ]]) | |
| >>> librosa.sequence.transition_loop(3, [0.8, 0.5, 0.25]) | |
| array([[0.8 , 0.1 , 0.1 ], | |
| [0.25 , 0.5 , 0.25 ], | |
| [0.375, 0.375, 0.25 ]]) | |
| """ | |
| if not (is_positive_int(n_states) and (n_states > 1)): | |
| raise ParameterError(f"n_states={n_states} must be a positive integer > 1") | |
| transition = np.empty((n_states, n_states), dtype=np.float64) | |
| # if it's a float, make it a vector | |
| prob = np.asarray(prob, dtype=np.float64) | |
| if prob.ndim == 0: | |
| prob = np.tile(prob, n_states) | |
| if prob.shape != (n_states,): | |
| raise ParameterError( | |
| f"prob={prob} must have length equal to n_states={n_states}" | |
| ) | |
| if np.any(prob < 0) or np.any(prob > 1): | |
| raise ParameterError(f"prob={prob} must have values in the range [0, 1]") | |
| for i, prob_i in enumerate(prob): | |
| transition[i] = (1.0 - prob_i) / (n_states - 1) | |
| transition[i, i] = prob_i | |
| return transition | |
| def transition_cycle(n_states: int, prob: Union[float, Iterable[float]]) -> np.ndarray: | |
| """Construct a cyclic transition matrix over ``n_states``. | |
| The transition matrix will have the following properties: | |
| - ``transition[i, i] = p`` | |
| - ``transition[i, i + 1] = (1 - p)`` | |
| This type of transition matrix is appropriate for state spaces | |
| with cyclical structure, such as metrical position within a bar. | |
| For example, a song in 4/4 time has state transitions of the form | |
| 1->{1, 2}, 2->{2, 3}, 3->{3, 4}, 4->{4, 1}. | |
| Parameters | |
| ---------- | |
| n_states : int > 1 | |
| The number of states | |
| prob : float in [0, 1] or iterable, length=n_states | |
| If a scalar, this is the probability of a self-transition. | |
| If a vector of length ``n_states``, ``p[i]`` is the probability of | |
| self-transition in state ``i`` | |
| Returns | |
| ------- | |
| transition : np.ndarray [shape=(n_states, n_states)] | |
| The transition matrix | |
| Examples | |
| -------- | |
| >>> librosa.sequence.transition_cycle(4, 0.9) | |
| array([[0.9, 0.1, 0. , 0. ], | |
| [0. , 0.9, 0.1, 0. ], | |
| [0. , 0. , 0.9, 0.1], | |
| [0.1, 0. , 0. , 0.9]]) | |
| """ | |
| if not (is_positive_int(n_states) and n_states > 1): | |
| raise ParameterError(f"n_states={n_states} must be a positive integer > 1") | |
| transition = np.zeros((n_states, n_states), dtype=np.float64) | |
| # if it's a float, make it a vector | |
| prob = np.asarray(prob, dtype=np.float64) | |
| if prob.ndim == 0: | |
| prob = np.tile(prob, n_states) | |
| if prob.shape != (n_states,): | |
| raise ParameterError( | |
| f"prob={prob} must have length equal to n_states={n_states}" | |
| ) | |
| if np.any(prob < 0) or np.any(prob > 1): | |
| raise ParameterError(f"prob={prob} must have values in the range [0, 1]") | |
| for i, prob_i in enumerate(prob): | |
| transition[i, np.mod(i + 1, n_states)] = 1.0 - prob_i | |
| transition[i, i] = prob_i | |
| return transition | |
| def transition_local( | |
| n_states: int, | |
| width: Union[int, Iterable[int]], | |
| *, | |
| window: _WindowSpec = "triangle", | |
| wrap: bool = False, | |
| ) -> np.ndarray: | |
| """Construct a localized transition matrix. | |
| The transition matrix will have the following properties: | |
| - ``transition[i, j] = 0`` if ``|i - j| > width`` | |
| - ``transition[i, i]`` is maximal | |
| - ``transition[i, i - width//2 : i + width//2]`` has shape ``window`` | |
| This type of transition matrix is appropriate for state spaces | |
| that discretely approximate continuous variables, such as in fundamental | |
| frequency estimation. | |
| Parameters | |
| ---------- | |
| n_states : int > 1 | |
| The number of states | |
| width : int >= 1 or iterable | |
| The maximum number of states to treat as "local". | |
| If iterable, it should have length equal to ``n_states``, | |
| and specify the width independently for each state. | |
| window : str, callable, or window specification | |
| The window function to determine the shape of the "local" distribution. | |
| Any window specification supported by `filters.get_window` will work here. | |
| .. note:: Certain windows (e.g., 'hann') are identically 0 at the boundaries, | |
| so and effectively have ``width-2`` non-zero values. You may have to expand | |
| ``width`` to get the desired behavior. | |
| wrap : bool | |
| If ``True``, then state locality ``|i - j|`` is computed modulo ``n_states``. | |
| If ``False`` (default), then locality is absolute. | |
| See Also | |
| -------- | |
| librosa.filters.get_window | |
| Returns | |
| ------- | |
| transition : np.ndarray [shape=(n_states, n_states)] | |
| The transition matrix | |
| Examples | |
| -------- | |
| Triangular distributions with and without wrapping | |
| >>> librosa.sequence.transition_local(5, 3, window='triangle', wrap=False) | |
| array([[0.667, 0.333, 0. , 0. , 0. ], | |
| [0.25 , 0.5 , 0.25 , 0. , 0. ], | |
| [0. , 0.25 , 0.5 , 0.25 , 0. ], | |
| [0. , 0. , 0.25 , 0.5 , 0.25 ], | |
| [0. , 0. , 0. , 0.333, 0.667]]) | |
| >>> librosa.sequence.transition_local(5, 3, window='triangle', wrap=True) | |
| array([[0.5 , 0.25, 0. , 0. , 0.25], | |
| [0.25, 0.5 , 0.25, 0. , 0. ], | |
| [0. , 0.25, 0.5 , 0.25, 0. ], | |
| [0. , 0. , 0.25, 0.5 , 0.25], | |
| [0.25, 0. , 0. , 0.25, 0.5 ]]) | |
| Uniform local distributions with variable widths and no wrapping | |
| >>> librosa.sequence.transition_local(5, [1, 2, 3, 3, 1], window='ones', wrap=False) | |
| array([[1. , 0. , 0. , 0. , 0. ], | |
| [0.5 , 0.5 , 0. , 0. , 0. ], | |
| [0. , 0.333, 0.333, 0.333, 0. ], | |
| [0. , 0. , 0.333, 0.333, 0.333], | |
| [0. , 0. , 0. , 0. , 1. ]]) | |
| """ | |
| if not (is_positive_int(n_states) and n_states > 1): | |
| raise ParameterError(f"n_states={n_states} must be a positive integer > 1") | |
| width = np.asarray(width, dtype=int) | |
| if width.ndim == 0: | |
| width = np.tile(width, n_states) | |
| if width.shape != (n_states,): | |
| raise ParameterError( | |
| f"width={width} must have length equal to n_states={n_states}" | |
| ) | |
| if np.any(width < 1): | |
| raise ParameterError(f"width={width} must be at least 1") | |
| transition = np.zeros((n_states, n_states), dtype=np.float64) | |
| # Fill in the widths. This is inefficient, but simple | |
| for i, width_i in enumerate(width): | |
| trans_row = pad_center( | |
| get_window(window, width_i, fftbins=False), size=n_states | |
| ) | |
| trans_row = np.roll(trans_row, n_states // 2 + i + 1) | |
| if not wrap: | |
| # Knock out the off-diagonal-band elements | |
| trans_row[min(n_states, i + width_i // 2 + 1) :] = 0 | |
| trans_row[: max(0, i - width_i // 2)] = 0 | |
| transition[i] = trans_row | |
| # Row-normalize | |
| transition /= transition.sum(axis=1, keepdims=True) | |
| return transition | |