Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import json | |
| import os | |
| valid_track_infos = {'uri', 'name', 'artist_name', 'popularity', 'artist_genres', 'album', | |
| 'artist_popularity', 'audio_features', 'audio_analysis'} | |
| def get_all_tracks_from_playlist_uri(sp, playlist_uri): | |
| # get all playlist_tracks | |
| offset = 0 | |
| tracks = [] | |
| done = False | |
| while not done: | |
| new_tracks = sp.playlist_tracks(playlist_uri, offset=offset, limit=100)["items"] | |
| tracks += new_tracks | |
| if len(new_tracks) < 100: | |
| done = True | |
| else: | |
| offset += 100 | |
| return tracks | |
| def update_data_with_audio_features(sp, uris, data): | |
| assert len(uris) <= 100 | |
| tracks_audio_features = sp.audio_features(uris) | |
| for i in range(len(uris)): | |
| data[uris[i]]['track']['audio_features'] = tracks_audio_features[i] | |
| return data, [] | |
| def check_all_track_has_audio_features(data): | |
| for uri in data.keys(): | |
| assert 'audio_features' in data[uri]['track'].keys() | |
| def get_all_tracks_from_playlists(sp, playlist_uris, verbose=False): | |
| if verbose: print(f'Extracting all tracks from {len(playlist_uris)} playlists.') | |
| # load data | |
| cache_path = './cache_track_features_tmp.json' | |
| if True: #not os.path.exists(cache_path): | |
| with open(cache_path, 'w') as f: | |
| json.dump(dict(), f) | |
| with open(cache_path, 'r') as f: | |
| data = json.load(f) | |
| for k in list(data.keys()).copy(): | |
| if k not in playlist_uris: | |
| data.pop(k) | |
| else: | |
| print(k) | |
| if verbose: print(f'\t{len(data.keys())} tracks loaded from cache') | |
| # for each playlist, extract all tracks, remove doubles | |
| if verbose: print(f'\tScanning tracks for each playlist') | |
| new_additions = 0 | |
| added_uris = [] | |
| for i_playlist, playlist_uri in enumerate(playlist_uris): | |
| new_tracks = get_all_tracks_from_playlist_uri(sp, playlist_uri) | |
| # remove doubles | |
| for new_track in new_tracks: | |
| uri = new_track['track']['uri'].split(':')[-1] | |
| if uri not in set(data.keys()): | |
| genres = sp.artist(new_track['track']['artists'][0]['uri'])['genres'] | |
| new_track['track']['genres'] = genres | |
| data[uri] = new_track | |
| added_uris.append(uri) | |
| new_additions += 1 | |
| # when 100 new added uris, compute their audio features | |
| if len(added_uris) == 100: | |
| data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
| if (new_additions + 1) % 1000 == 0: | |
| data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
| check_all_track_has_audio_features(data) | |
| with open(cache_path, 'w') as f: | |
| json.dump(data, f) | |
| if verbose: print(f"\t\t{i_playlist + 1} playlists scanned ({new_additions} new tracks, total: {len(data.keys())} tracks)") | |
| if verbose: print('\tDone.') | |
| data, _ = update_data_with_audio_features(sp, added_uris, data) | |
| check_all_track_has_audio_features(data) | |
| with open(cache_path, 'w') as f: | |
| json.dump(data, f) | |
| return data | |
| def get_all_tracks_from_user(sp, user_id='bkayf', verbose=False): | |
| if verbose: print(f'Extracting all tracks from user {user_id}.') | |
| # load data | |
| if user_id == 'bkayf': | |
| cache_path = '../data/bkayf/cache_track_features.json' | |
| if not os.path.exists(cache_path): | |
| with open(cache_path, 'w') as f: | |
| json.dump(dict(), f) | |
| with open(cache_path, 'r') as f: | |
| data = json.load(f) | |
| else: | |
| data = dict() | |
| if verbose: print(f'\t{len(data.keys())} tracks loaded from cache') | |
| # first get all playlists | |
| offset = 0 | |
| done = False | |
| playlists = [] | |
| if verbose: print(f'\tScanning playlists.') | |
| while not done: | |
| new_playlists = sp.user_playlists(user_id, offset=offset, limit=50)['items'] | |
| playlists += new_playlists | |
| if len(new_playlists) < 50: | |
| done = True | |
| if verbose: print(f'\t\tfrom {offset} to {offset + len(new_playlists)} (complete).') | |
| else: | |
| if verbose: print(f'\t\tfrom {offset} to {offset + len(new_playlists)},') | |
| offset += 50 | |
| # for each playlist, extract all tracks, remove doubles | |
| if verbose: print(f'\tScanning tracks for each playlist') | |
| new_additions = 0 | |
| added_uris = [] | |
| for i_playlist, playlist in enumerate(playlists): | |
| if (i_playlist + 1) % 5 == 0: | |
| if verbose: print(f"\t\t{i_playlist + 1} playlists scanned ({new_additions} new tracks, total: {len(data.keys())} tracks)") | |
| playlist_uri = playlist['uri'].split(':')[-1] | |
| new_tracks = get_all_tracks_from_playlist_uri(sp, playlist_uri) | |
| # remove doubles | |
| for new_track in new_tracks: | |
| uri = new_track['track']['uri'].split(':')[-1] | |
| if uri not in set(data.keys()): | |
| data[uri] = new_track | |
| added_uris.append(uri) | |
| new_additions += 1 | |
| # when 100 new added uris, compute their audio features | |
| if len(added_uris) == 100: | |
| data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
| if (new_additions + 1) % 1000 == 0 and user_id == "bkayf": | |
| data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
| check_all_track_has_audio_features(data) | |
| with open(cache_path, 'w') as f: | |
| json.dump(data, f) | |
| if verbose: print('\tDone.') | |
| if user_id == "bkayf": | |
| data, _ = update_data_with_audio_features(sp, added_uris, data) | |
| check_all_track_has_audio_features(data) | |
| with open(cache_path, 'w') as f: | |
| json.dump(data, f) | |
| return data | |
| def get_uri_from_link(link): | |
| return link.split("?")[0].split("/")[-1] | |
| def get_track_info_from_playlist_uri(sp, playlist_uri, which_info=['uri'], verbose=False): | |
| output = dict() | |
| assert len(set(which_info) - valid_track_infos) == 0, f"Error which_info. Valid infos are: {valid_track_infos}" | |
| tracks = get_all_tracks_from_playlist_uri(sp, playlist_uri) | |
| if verbose: print(f'Playlist with {len(tracks)} tracks.') | |
| # prepare artist info if needed | |
| if any([w in which_info for w in ['artist_genres', 'artist_popularity', 'artist_name']]): | |
| artist_uris = [x["track"]["artists"][0]["uri"] for x in tracks] | |
| artist_infos = [sp.artist(artist_uri) for artist_uri in artist_uris] | |
| for info in which_info: | |
| # print(info) | |
| if info in ['uri', 'name', 'album', 'popularity']: | |
| output[info] = [] | |
| for i_t, x in enumerate(tracks): | |
| print(i_t) | |
| output[info].append(x["track"][info]) | |
| # output[info] = [x["track"][info] for x in tracks] | |
| elif info in ['artist_genres', 'artist_popularity', 'artist_name']: | |
| output[info] = [artist_info[info.split('_')[1]] for artist_info in artist_infos] | |
| elif info == 'album': | |
| output[info] = [x["track"][info]["name"] for x in tracks] | |
| elif info == 'audio_features': | |
| output[info] = [] | |
| for i_t, x in enumerate(tracks): | |
| print(i_t) | |
| output[info].append(sp.audio_features(x["track"]["uri"])) | |
| # output[info] = [sp.audio_features(x["track"]["uri"]) for x in tracks] | |
| elif info == 'audio_analysis': | |
| output[info] = [sp.audio_analysis(x["track"]["uri"]) for x in tracks] | |
| else: | |
| raise NotImplementedError | |
| return output | |
| def compute_progress_and_eta(times, iter, total, n_av=3000): | |
| av_time = np.mean(times[-n_av:]) | |
| progress = int(((iter + 1) / total) * 100) | |
| eta_h = int(av_time * (total - iter) // 3600) | |
| eta_m = int((av_time * (total - iter) - (eta_h * 3600)) // 60) | |
| eta_s = int((av_time * (total - iter) - (eta_h * 3600) - eta_m * 60)) | |
| eta = f"Progress: {progress}%, ETA: {eta_h}H{eta_m}M{eta_s}S." | |
| return eta | |