Spaces:
Build error
Build error
| from numpy import array, zeros, full, argmin, inf, ndim | |
| from scipy.spatial.distance import cdist | |
| from math import isinf | |
| def dtw(x, y, dist, warp=1, w=inf, s=1.0): | |
| """ | |
| Computes Dynamic Time Warping (DTW) of two sequences. | |
| :param array x: N1*M array | |
| :param array y: N2*M array | |
| :param func dist: distance used as cost measure | |
| :param int warp: how many shifts are computed. | |
| :param int w: window size limiting the maximal distance between indices of matched entries |i,j|. | |
| :param float s: weight applied on off-diagonal moves of the path. As s gets larger, the warping path is increasingly biased towards the diagonal | |
| Returns the minimum distance, the cost matrix, the accumulated cost matrix, and the wrap path. | |
| """ | |
| assert len(x) | |
| assert len(y) | |
| assert isinf(w) or (w >= abs(len(x) - len(y))) | |
| assert s > 0 | |
| r, c = len(x), len(y) | |
| if not isinf(w): | |
| D0 = full((r + 1, c + 1), inf) | |
| for i in range(1, r + 1): | |
| D0[i, max(1, i - w):min(c + 1, i + w + 1)] = 0 | |
| D0[0, 0] = 0 | |
| else: | |
| D0 = zeros((r + 1, c + 1)) | |
| D0[0, 1:] = inf | |
| D0[1:, 0] = inf | |
| D1 = D0[1:, 1:] # view | |
| for i in range(r): | |
| for j in range(c): | |
| if (isinf(w) or (max(0, i - w) <= j <= min(c, i + w))): | |
| D1[i, j] = dist(x[i], y[j]) | |
| C = D1.copy() | |
| jrange = range(c) | |
| for i in range(r): | |
| if not isinf(w): | |
| jrange = range(max(0, i - w), min(c, i + w + 1)) | |
| for j in jrange: | |
| min_list = [D0[i, j]] | |
| for k in range(1, warp + 1): | |
| i_k = min(i + k, r) | |
| j_k = min(j + k, c) | |
| min_list += [D0[i_k, j] * s, D0[i, j_k] * s] | |
| D1[i, j] += min(min_list) | |
| if len(x) == 1: | |
| path = zeros(len(y)), range(len(y)) | |
| elif len(y) == 1: | |
| path = range(len(x)), zeros(len(x)) | |
| else: | |
| path = _traceback(D0) | |
| return D1[-1, -1], C, D1, path | |
| def accelerated_dtw(x, y, dist, warp=1): | |
| """ | |
| Computes Dynamic Time Warping (DTW) of two sequences in a faster way. | |
| Instead of iterating through each element and calculating each distance, | |
| this uses the cdist function from scipy (https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cdist.html) | |
| :param array x: N1*M array | |
| :param array y: N2*M array | |
| :param string or func dist: distance parameter for cdist. When string is given, cdist uses optimized functions for the distance metrics. | |
| If a string is passed, the distance function can be 'braycurtis', 'canberra', 'chebyshev', 'cityblock', 'correlation', 'cosine', 'dice', 'euclidean', 'hamming', 'jaccard', 'kulsinski', 'mahalanobis', 'matching', 'minkowski', 'rogerstanimoto', 'russellrao', 'seuclidean', 'sokalmichener', 'sokalsneath', 'sqeuclidean', 'wminkowski', 'yule'. | |
| :param int warp: how many shifts are computed. | |
| Returns the minimum distance, the cost matrix, the accumulated cost matrix, and the wrap path. | |
| """ | |
| assert len(x) | |
| assert len(y) | |
| if ndim(x) == 1: | |
| x = x.reshape(-1, 1) | |
| if ndim(y) == 1: | |
| y = y.reshape(-1, 1) | |
| r, c = len(x), len(y) | |
| D0 = zeros((r + 1, c + 1)) | |
| D0[0, 1:] = inf | |
| D0[1:, 0] = inf | |
| D1 = D0[1:, 1:] | |
| D0[1:, 1:] = cdist(x, y, dist) | |
| C = D1.copy() | |
| for i in range(r): | |
| for j in range(c): | |
| min_list = [D0[i, j]] | |
| for k in range(1, warp + 1): | |
| min_list += [D0[min(i + k, r), j], | |
| D0[i, min(j + k, c)]] | |
| D1[i, j] += min(min_list) | |
| if len(x) == 1: | |
| path = zeros(len(y)), range(len(y)) | |
| elif len(y) == 1: | |
| path = range(len(x)), zeros(len(x)) | |
| else: | |
| path = _traceback(D0) | |
| return D1[-1, -1], C, D1, path | |
| def _traceback(D): | |
| i, j = array(D.shape) - 2 | |
| p, q = [i], [j] | |
| while (i > 0) or (j > 0): | |
| tb = argmin((D[i, j], D[i, j + 1], D[i + 1, j])) | |
| if tb == 0: | |
| i -= 1 | |
| j -= 1 | |
| elif tb == 1: | |
| i -= 1 | |
| else: # (tb == 2): | |
| j -= 1 | |
| p.insert(0, i) | |
| q.insert(0, j) | |
| return array(p), array(q) | |
| if __name__ == '__main__': | |
| w = inf | |
| s = 1.0 | |
| if 1: # 1-D numeric | |
| from sklearn.metrics.pairwise import manhattan_distances | |
| import numpy as np | |
| x = [0, 0, 1, 1, 2, 4, 2, 1, 2, 0] | |
| x = np.array(x).reshape([-1,1,1]) | |
| y = [1, 1, 1, 2, 2, 2, 2, 3, 2, 0] | |
| y = np.array(y).reshape([-1,1,1]) | |
| dist_fun = manhattan_distances | |
| w = 1 | |
| # s = 1.2 | |
| elif 0: # 2-D numeric | |
| from sklearn.metrics.pairwise import euclidean_distances | |
| x = [[0, 0], [0, 1], [1, 1], [1, 2], [2, 2], [4, 3], [2, 3], [1, 1], [2, 2], [0, 1]] | |
| y = [[1, 0], [1, 1], [1, 1], [2, 1], [4, 3], [4, 3], [2, 3], [3, 1], [1, 2], [1, 0]] | |
| dist_fun = euclidean_distances | |
| else: # 1-D list of strings | |
| from nltk.metrics.distance import edit_distance | |
| # x = ['we', 'shelled', 'clams', 'for', 'the', 'chowder'] | |
| # y = ['class', 'too'] | |
| x = ['i', 'soon', 'found', 'myself', 'muttering', 'to', 'the', 'walls'] | |
| y = ['see', 'drown', 'himself'] | |
| # x = 'we talked about the situation'.split() | |
| # y = 'we talked about the situation'.split() | |
| dist_fun = edit_distance | |
| dist, cost, acc, path = dtw(x, y, dist_fun, w=w, s=s) | |
| # Vizualize | |
| from matplotlib import pyplot as plt | |
| plt.imshow(cost.T, origin='lower', cmap=plt.cm.Reds, interpolation='nearest') | |
| plt.plot(path[0], path[1], '-o') # relation | |
| plt.xticks(range(len(x)), x) | |
| plt.yticks(range(len(y)), y) | |
| plt.xlabel('x') | |
| plt.ylabel('y') | |
| plt.axis('tight') | |
| if isinf(w): | |
| plt.title('Minimum distance: {}, slope weight: {}'.format(dist, s)) | |
| else: | |
| plt.title('Minimum distance: {}, window widht: {}, slope weight: {}'.format(dist, w, s)) | |
| plt.show() | |