Spaces:
Runtime error
Runtime error
| import os | |
| import shutil | |
| import hashlib | |
| import time | |
| LOGS_FOLDER = '/content/Applio-RVC-Fork/logs' | |
| WEIGHTS_FOLDER = '/content/Applio-RVC-Fork/weights' | |
| GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup' | |
| def import_google_drive_backup(): | |
| print("Importing Google Drive backup...") | |
| GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup' # change this to your Google Drive path | |
| LOGS_FOLDER = '/content/Applio-RVC-Fork/logs' | |
| WEIGHTS_FOLDER = '/content/Applio-RVC-Fork/weights' | |
| weights_exist = False | |
| files_to_copy = [] | |
| weights_to_copy = [] | |
| def handle_files(root, files, is_weight_files=False): | |
| for filename in files: | |
| filepath = os.path.join(root, filename) | |
| if filename.endswith('.pth') and is_weight_files: | |
| weights_exist = True | |
| backup_filepath = os.path.join(WEIGHTS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH)) | |
| else: | |
| backup_filepath = os.path.join(LOGS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH)) | |
| backup_folderpath = os.path.dirname(backup_filepath) | |
| if not os.path.exists(backup_folderpath): | |
| os.makedirs(backup_folderpath) | |
| print(f'Created folder: {backup_folderpath}', flush=True) | |
| if is_weight_files: | |
| weights_to_copy.append((filepath, backup_filepath)) | |
| else: | |
| files_to_copy.append((filepath, backup_filepath)) | |
| for root, dirs, files in os.walk(os.path.join(GOOGLE_DRIVE_PATH, 'logs')): | |
| handle_files(root, files) | |
| for root, dirs, files in os.walk(os.path.join(GOOGLE_DRIVE_PATH, 'weights')): | |
| handle_files(root, files, True) | |
| # Copy files in batches | |
| total_files = len(files_to_copy) | |
| start_time = time.time() | |
| for i, (source, dest) in enumerate(files_to_copy, start=1): | |
| with open(source, 'rb') as src, open(dest, 'wb') as dst: | |
| shutil.copyfileobj(src, dst, 1024*1024) # 1MB buffer size | |
| # Report progress every 5 seconds or after every 100 files, whichever is less frequent | |
| if time.time() - start_time > 5 or i % 100 == 0: | |
| print(f'\rCopying file {i} of {total_files} ({i * 100 / total_files:.2f}%)', end="") | |
| start_time = time.time() | |
| print(f'\nImported {len(files_to_copy)} files from Google Drive backup') | |
| # Copy weights in batches | |
| total_weights = len(weights_to_copy) | |
| start_time = time.time() | |
| for i, (source, dest) in enumerate(weights_to_copy, start=1): | |
| with open(source, 'rb') as src, open(dest, 'wb') as dst: | |
| shutil.copyfileobj(src, dst, 1024*1024) # 1MB buffer size | |
| # Report progress every 5 seconds or after every 100 files, whichever is less frequent | |
| if time.time() - start_time > 5 or i % 100 == 0: | |
| print(f'\rCopying weight file {i} of {total_weights} ({i * 100 / total_weights:.2f}%)', end="") | |
| start_time = time.time() | |
| if weights_exist: | |
| print(f'\nImported {len(weights_to_copy)} weight files') | |
| print("Copied weights from Google Drive backup to local weights folder.") | |
| else: | |
| print("\nNo weights found in Google Drive backup.") | |
| print("Google Drive backup import completed.") | |
| def backup_files(): | |
| print("\n Starting backup loop...") | |
| last_backup_timestamps_path = os.path.join(LOGS_FOLDER, 'last_backup_timestamps.txt') | |
| fully_updated = False # boolean to track if all files are up to date | |
| try: | |
| with open(last_backup_timestamps_path, 'r') as f: | |
| last_backup_timestamps = dict(line.strip().split(':') for line in f) | |
| except: | |
| last_backup_timestamps = {} | |
| while True: | |
| updated = False | |
| files_to_copy = [] | |
| files_to_delete = [] | |
| for root, dirs, files in os.walk(LOGS_FOLDER): | |
| for filename in files: | |
| if filename != 'last_backup_timestamps.txt': | |
| filepath = os.path.join(root, filename) | |
| if os.path.isfile(filepath): | |
| backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) | |
| backup_folderpath = os.path.dirname(backup_filepath) | |
| if not os.path.exists(backup_folderpath): | |
| os.makedirs(backup_folderpath) | |
| print(f'Created backup folder: {backup_folderpath}', flush=True) | |
| # check if file has changed since last backup | |
| last_backup_timestamp = last_backup_timestamps.get(filepath) | |
| current_timestamp = os.path.getmtime(filepath) | |
| if last_backup_timestamp is None or float(last_backup_timestamp) < current_timestamp: | |
| files_to_copy.append((filepath, backup_filepath)) # add to list of files to copy | |
| last_backup_timestamps[filepath] = str(current_timestamp) # update last backup timestamp | |
| updated = True | |
| fully_updated = False # if a file is updated, all files are not up to date | |
| # check if any files were deleted in Colab and delete them from the backup drive | |
| for filepath in list(last_backup_timestamps.keys()): | |
| if not os.path.exists(filepath): | |
| backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) | |
| if os.path.exists(backup_filepath): | |
| files_to_delete.append(backup_filepath) # add to list of files to delete | |
| del last_backup_timestamps[filepath] | |
| updated = True | |
| fully_updated = False # if a file is deleted, all files are not up to date | |
| # Copy files in batches | |
| if files_to_copy: | |
| for source, dest in files_to_copy: | |
| shutil.copy2(source, dest) | |
| print(f'Copied or updated {len(files_to_copy)} files') | |
| # Delete files in batches | |
| if files_to_delete: | |
| for file in files_to_delete: | |
| os.remove(file) | |
| print(f'Deleted {len(files_to_delete)} files') | |
| if not updated and not fully_updated: | |
| print("Files are up to date.") | |
| fully_updated = True # if all files are up to date, set the boolean to True | |
| copy_weights_folder_to_drive() | |
| with open(last_backup_timestamps_path, 'w') as f: | |
| for filepath, timestamp in last_backup_timestamps.items(): | |
| f.write(f'{filepath}:{timestamp}\n') | |
| time.sleep(15) # wait for 15 seconds before checking again | |